Coverage for backend/django/flowsheetInternals/unitops/models/SimulationObject.py: 84%
521 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-12-18 04:00 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-12-18 04:00 +0000
1from django.db import models
2from django.db.models import QuerySet
3from core.managers import SoftDeleteManager
4from core.auxiliary.enums.unitOpData import SimulationObjectClass
5from core.auxiliary.models.Flowsheet import Flowsheet
6from core.auxiliary.models.PropertySet import PropertySet
7from core.auxiliary.models.PropertyInfo import PropertyInfo
8from typing import TYPE_CHECKING
9from core.auxiliary.models.PropertyValue import PropertyValue, PropertyValueIntermediate
10from core.auxiliary.models.IndexedItem import IndexedItem, IndexChoices
11from typing import Set
13from flowsheetInternals.propertyPackages.models.SimulationObjectPropertyPackages import SimulationObjectPropertyPackages
14from flowsheetInternals.unitops.models.Port import Port
15from typing import Iterable
16from flowsheetInternals.unitops.config.config_methods import *
17from common.config_types import *
18import itertools
20from .compound_propogation import update_compounds_on_set, update_compounds_on_merge, _get_compound_keys, \
21 update_decision_node_and_propagate, update_compounds_on_add_stream
22from typing import Optional, List
23from ..methods.add_expression import add_expression as _add_expression
25if TYPE_CHECKING:
26 from core.auxiliary.models.PropertySet import PropertySet
27 from core.auxiliary.models.PropertyInfo import PropertyInfo
28 from core.auxiliary.models.RecycleData import RecycleData
29 from flowsheetInternals.graphicData.models.graphicObjectModel import GraphicObject
30 from flowsheetInternals.graphicData.models.groupingModel import Grouping
31 from flowsheetInternals.unitops.models.Port import Port
32 from flowsheetInternals.propertyPackages.models.SimulationObjectPropertyPackages import \
33 SimulationObjectPropertyPackages
36class SimulationObject(models.Model):
37 flowsheet = models.ForeignKey(Flowsheet, on_delete=models.CASCADE, related_name="flowsheetObjects")
38 componentName = models.CharField(max_length=64)
39 objectType = models.CharField(choices=SimulationObjectClass.choices)
41 created_at = models.DateTimeField(auto_now_add=True)
42 is_deleted = models.BooleanField(default=False)
43 initial_values = models.JSONField(null=True, blank=True)
45 # add a soft delete manager
46 objects = SoftDeleteManager()
47 add_expression = _add_expression
48 # runtime-accessed attributes
49 properties: "PropertySet"
50 graphicObject: "GraphicObject"
51 ports: QuerySet["Port"]
52 connectedPorts: QuerySet["Port"]
53 propertyPackages: "SimulationObjectPropertyPackages"
54 recycleConnection: "Optional[RecycleData]"
55 recycleData: "Optional[RecycleData]"
57 @property
58 def schema(self) -> ObjectType:
59 return get_object_schema(self)
61 @property
62 def has_recycle_connection(self) -> bool:
63 return hasattr(self, "recycleConnection")
65 def is_stream(self) -> bool:
66 return self.schema.is_stream
68 def get_stream(self, key: str, index: int = 0) -> "SimulationObject":
69 """
70 Returns the stream attached to the port with the given key
71 """
72 port = self.get_port(key, index)
73 stream = port.stream
74 return stream
76 def get_group(self) -> "Grouping":
77 """
78 Returns the group that this object belongs to
79 """
80 if not self.is_stream(): 80 ↛ 85line 80 didn't jump to line 85 because the condition on line 80 was always true
81 # There is only one graphic object, so we can just return the group
82 return self.graphicObject.last().group
83 else:
84 # Streams arent' really in a group, throw an error
85 raise ValueError("Streams do not belong to a group")
87 def get_groups(self) -> Iterable["Grouping"]:
88 """
89 Returns an iterable of groups that this object belongs to. Mostly needed for streams that have many groups.
90 """
91 return (graphic.group for graphic in self.graphicObject.all())
93 def get_parent_groups(self) -> List["Grouping"]:
94 """
95 Returns an iterable of parent groups that this object belongs to.
96 """
97 parent_groups = []
98 current_group = self.get_group()
99 while current_group:
100 parent_groups.append(current_group)
101 simulationObject = current_group.simulationObject
102 if simulationObject is None: 102 ↛ 103line 102 didn't jump to line 103 because the condition on line 102 was never true
103 break
104 current_group = simulationObject.get_group()
105 return parent_groups
107 def get_property_package(self, name: str | None = None) -> SimulationObjectPropertyPackages:
108 """
109 Returns the property package slot with the given name
110 """
111 property_package_slot: SimulationObjectPropertyPackages
112 if name is None:
113 property_package_slot = self.propertyPackages.first()
114 else:
115 property_package_slot = self.propertyPackages.get(name=name)
116 return property_package_slot
118 def set_property_package(self, property_package, name: str | None = None) -> None:
119 """
120 Sets the property package for this object
121 """
122 property_package_slot = self.get_property_package(name)
123 property_package_slot.propertyPackage = property_package
124 property_package_slot.save()
126 def get_port(self, key: str, index: int = 0) -> Port:
127 """
128 Returns the port with the given key
129 """
130 try:
132 port: Port = self.ports.get(key=key, index=index)
133 return port
134 except Port.DoesNotExist:
135 raise ValueError(f"Port with key {key} does not exist on object {self.componentName}")
137 def reorder_object_ports(self):
138 """
139 Reorders port mappings by connected unit operation y position
140 :return: None
141 """
142 inlet_connections = [port for port in self.ports.filter(direction=ConType.Inlet).all()]
143 outlet_connections = [port for port in self.ports.filter(direction=ConType.Outlet).all()]
144 inlet_connections.sort(
145 key=lambda port: port.stream.connectedPorts.get(direction=ConType.Outlet).unitOp.graphicObject.last().y)
146 outlet_connections.sort(
147 key=lambda port: port.stream.connectedPorts.get(direction=ConType.Inlet).unitOp.graphicObject.last().y)
148 for i, port in enumerate(inlet_connections):
149 port.index = i
150 port.save()
151 for i, port in enumerate(outlet_connections):
152 port.index = i
153 port.save()
154 self.save()
156 def horizontally_center_graphic(self) -> None:
157 """
158 Horizontally centers GraphicObject for a stream or unit operation
159 Currently uni-directional - left to right connections
160 Centers based on inlet and outlet graphics.
161 :return: None
162 """
163 # TODO: Make this work as appropriate for streams with multiple graphic objects
164 # Is an Intermediate Stream
165 if self.objectType == SimulationObjectClass.Stream or SimulationObjectClass.EnergyStream or SimulationObjectClass.acStream and self.connectedPorts.count() > 1: 165 ↛ 178line 165 didn't jump to line 178 because the condition on line 165 was always true
166 inlet, outlet = self.connectedPorts.filter(direction=ConType.Outlet).first(), self.connectedPorts.filter(
167 direction=ConType.Inlet).first()
168 inlet_x, outlet_x = inlet.unitOp.graphicObject.last().x, outlet.unitOp.graphicObject.last().x
169 if inlet_x <= outlet_x:
170 self.graphicObject.last().x = abs(
171 (inlet_x + (inlet.unitOp.graphicObject.last().width)) + (outlet_x)) / 2
172 else:
173 self.graphicObject.last().x = abs(inlet_x + (outlet_x + outlet.unitOp.graphicObject.last().width)) / 2
174 # Flip Graphic Object horizontally
175 self.graphicObject.last().save()
176 self.save()
177 else:
178 inlet, outlet = self.ports.filter(direction=ConType.Inlet).first(), self.ports.filter(
179 direction=ConType.Outlet).first()
180 self.graphicObject.last().x = abs(
181 (inlet.stream.graphicObject.last().x + outlet.stream.graphicObject.last().x) / 2)
182 self.graphicObject.last().save()
183 self.save()
184 return
186 def vertically_center_graphic(self) -> None:
187 """
188 Vertically centers GraphicObject for a stream or unit operation
189 :return: None
190 """
191 # TODO: Make this work as appropriate for streams with multiple graphic objects
192 # Is an intermediate stream
193 if self.objectType == SimulationObjectClass.Stream or SimulationObjectClass.EnergyStream or SimulationObjectClass.acStream and self.connectedPorts.count() > 1:
194 inlet, outlet = self.connectedPorts.filter(direction=ConType.Inlet).first(), self.connectedPorts.filter(
195 direction=ConType.Outlet).first()
196 self.graphicObject.last().y = abs(
197 (inlet.unitOp.graphicObject.last().y + outlet.unitOp.graphicObject.last().y) / 2)
198 self.graphicObject.last().save()
199 self.save()
200 else:
201 inlet, outlet = self.ports.filter(direction=ConType.Inlet).first(), \
202 self.ports.filter(direction=ConType.Outlet).all()[1]
203 self.graphicObject.last().y = abs(
204 (inlet.stream.graphicObject.last().y + outlet.stream.graphicObject.last().y) / 2)
205 self.graphicObject.last().save()
206 self.save()
207 return
209 def split_stream(self) -> "SimulationObject":
210 """
211 Splits a stream into two separate streams (one inlet and one outlet - disconnected).
212 :return: New Stream Object (outlet stream) if object is a stream.
213 """
214 from flowsheetInternals.unitops.models.simulation_object_factory import SimulationObjectFactory
215 from flowsheetInternals.graphicData.models.graphicObjectModel import GraphicObject
217 new_stream = None
218 if self.objectType == SimulationObjectClass.Stream or SimulationObjectClass.EnergyStream or SimulationObjectClass.acStream: 218 ↛ 287line 218 didn't jump to line 287 because the condition on line 218 was always true
219 connectedPorts = self.connectedPorts.all()
220 inlet_port: Port = connectedPorts.get(direction=ConType.Inlet)
221 outlet_port: Port = connectedPorts.get(direction=ConType.Outlet)
222 new_stream = SimulationObjectFactory.create_stream_at_port(inlet_port)
223 new_stream.save()
224 # reset the stream to default position
225 coordinates = SimulationObjectFactory.default_stream_position(outlet_port.unitOp, outlet_port)
226 stream_graphic_object = self.graphicObject.last()
227 stream_graphic_object.x = coordinates['x'] - stream_graphic_object.width / 2
228 stream_graphic_object.y = coordinates['y'] - stream_graphic_object.height / 2
229 stream_graphic_object.save()
231 """
232 If a stream connects two groups together, it will have graphic objects in each of those groups.
233 This next section figures out which graphic object to keep, and which to move to the new stream.
234 """
235 all_graphic_objects = self.graphicObject.all()
236 default_graphic_object = new_stream.graphicObject.last() # a default graphic object is created by create_stream_at_port
237 # The graphic objects in these groups should be kept
238 groups_to_keep: List[int] = [g.id for g in outlet_port.unitOp.get_parent_groups()]
239 groups_to_move: List[int] = [g.id for g in inlet_port.unitOp.get_parent_groups()]
241 for gobj in all_graphic_objects:
242 if gobj.group.id in groups_to_keep:
243 # keep it here
244 if gobj.group.id in groups_to_move:
245 # move the default graphic object into this group, so both the inlet and outlet show in this group
246 default_graphic_object.group = gobj.group
247 default_graphic_object.save()
248 else:
249 if gobj.group.id not in groups_to_move: 249 ↛ 250line 249 didn't jump to line 250 because the condition on line 249 was never true
250 print("Error: this should be in either ggroups to move or the other", gobj.group)
251 # must be in groups_to_move
252 # move graphic object to this stream
253 gobj.simulationObject = new_stream
254 gobj.save()
256 # make sure both the original and new streams are shown in all relevant groups
257 for graphic_object in self.graphicObject.all():
258 current_group = graphic_object.group
259 while current_group:
260 parent_group = current_group.get_parent_group()
261 if parent_group:
262 # make sure the original stream is shown in the parent group
263 if not self.graphicObject.filter(group=parent_group).exists(): 263 ↛ 264line 263 didn't jump to line 264 because the condition on line 263 was never true
264 GraphicObject.objects.create(
265 flowsheet=self.flowsheet,
266 simulationObject=self,
267 width=graphic_object.width,
268 height=graphic_object.height,
269 x=graphic_object.x,
270 y=graphic_object.y,
271 group=parent_group,
272 )
274 # make sure the new stream is shown in the parent group
275 if not new_stream.graphicObject.filter(group=parent_group).exists(): 275 ↛ 276line 275 didn't jump to line 276 because the condition on line 275 was never true
276 GraphicObject.objects.create(
277 flowsheet=self.flowsheet,
278 simulationObject=new_stream,
279 width=graphic_object.width,
280 height=graphic_object.height,
281 x=graphic_object.x,
282 y=graphic_object.y,
283 group=parent_group
284 )
285 current_group = parent_group
287 return new_stream
289 def merge_parallel_streams(self, connected_stream: "SimulationObject", decision_node=None,
290 coordinates=None) -> "SimulationObject":
291 """
292 Merges two streams that have the same direction (Both Inlet or Outlet)
293 :param connected_stream: Stream to connect
294 :param decision_node: Decision Node object to merges streams into (In the case of n-inlet / n-outlet)
295 :return: Decision Node Object
296 """
297 from flowsheetInternals.unitops.models.simulation_object_factory import SimulationObjectFactory
298 is_inlet = self.connectedPorts.first().direction == ConType.Inlet
299 direction = ConType.Outlet if is_inlet else ConType.Inlet
300 if decision_node is None:
301 modified_schema: ObjectType = configuration["decisionNode"].model_copy(deep=True)
302 if (is_inlet): 302 ↛ 306line 302 didn't jump to line 306 because the condition on line 302 was always true
303 modified_schema.ports["outlet"].default = 2
304 modified_schema.ports["inlet"].default = 2
305 else:
306 modified_schema.ports["inlet"].default = 2
307 modified_schema.ports["outlet"].default = 2
308 decision_node = SimulationObjectFactory.create_simulation_object(
309 coordinates={'x': self.graphicObject.last().x,
310 'y': self.graphicObject.last().y} if coordinates is None else coordinates,
311 objectType="decisionNode",
312 schema=modified_schema,
313 flowsheet=self.flowsheet,
314 create_attached_streams=False,
315 )
316 decision_node.graphicObject.last().rotation = self.connectedPorts.first().unitOp.graphicObject.last().rotation
317 decision_node.graphicObject.last().save()
318 ports = decision_node.ports.filter(direction=direction).all()
319 port1 = ports[0]
320 port2 = ports[1]
321 port1.stream = self
322 port2.stream = connected_stream
323 port1.save()
324 port2.save()
326 decision_node.save()
328 # Center GraphicObjects
329 self.horizontally_center_graphic()
330 connected_stream.horizontally_center_graphic()
331 self.save()
332 connected_stream.save()
334 return decision_node
336 def merge_stream(self, connectStream: "SimulationObject") -> "SimulationObject":
337 """
338 Connects this object (a stream) to another stream
339 :param connectedStream: Stream to connect to this object
340 :return: Decision Node Object
341 """
342 material_stream_1 = self
343 material_stream_2 = connectStream
345 material_stream_1_port = material_stream_1.connectedPorts.first()
346 material_stream_2_port = material_stream_2.connectedPorts.first()
348 # Case 1: Intermediate to Intermediate
349 if material_stream_1.connectedPorts.count() > 1 and material_stream_2.connectedPorts.count() > 1:
350 coordinates = {'x': material_stream_2.graphicObject.last().x, 'y': material_stream_2.graphicObject.last().y}
351 outlet_stream_1 = material_stream_1.split_stream()
352 outlet_stream_2 = material_stream_2.split_stream()
354 decision_node = outlet_stream_2.merge_parallel_streams(outlet_stream_1, coordinates=coordinates)
355 decision_node = material_stream_2.merge_parallel_streams(material_stream_1, decision_node)
357 update_decision_node_and_propagate(decision_node)
358 decision_node.save()
359 return decision_node
361 # Case 2: Connecting intermediate stream with product or feed
362 if material_stream_1.connectedPorts.count() > 1 or material_stream_2.connectedPorts.count() > 1:
363 inter_stream, connected_stream = (
364 (material_stream_1,
365 material_stream_2) if material_stream_1.connectedPorts.count() > material_stream_2.connectedPorts.count()
366 else (material_stream_2, material_stream_1)
367 )
368 connected_stream_port = connected_stream.connectedPorts.first()
369 is_inlet = (connected_stream_port.direction == ConType.Inlet)
371 if is_inlet:
372 # Feed -> Intermediate
373 decision_node = inter_stream.make_decision_node(num_inlets=1, num_outlets=2)
374 empty_port = decision_node.ports.filter(direction=ConType.Outlet, stream=None).first()
375 empty_port.stream = connected_stream
376 empty_port.save()
378 self.delete_empty_node(connected_stream, connected_stream_port)
380 # Center graphics
381 self.horizontally_center_graphic()
382 connected_stream.horizontally_center_graphic()
383 self.save()
384 connected_stream.save()
386 # Update compounds - First update decision node, then propagate
387 update_decision_node_and_propagate(decision_node)
388 return decision_node
389 else:
390 # Product -> Intermediate
391 decision_node = inter_stream.make_decision_node(num_inlets=2, num_outlets=1)
392 empty_port = decision_node.ports.filter(direction=ConType.Inlet, stream=None).first()
393 empty_port.stream = connected_stream
394 empty_port.save()
396 self.delete_empty_node(connected_stream, connected_stream_port)
398 # Center graphics
399 self.horizontally_center_graphic()
400 connected_stream.horizontally_center_graphic()
401 self.save()
402 connected_stream.save()
404 # Update compounds - First update decision node, then propagate
405 update_decision_node_and_propagate(decision_node)
406 return decision_node
408 # Case 3: Feed to feed or product to product
409 if material_stream_1_port.direction == material_stream_2_port.direction: 409 ↛ 410line 409 didn't jump to line 410 because the condition on line 409 was never true
410 result = material_stream_2.merge_parallel_streams(material_stream_1)
411 update_decision_node_and_propagate(result)
412 return result
414 # Case 4: Feed to product or product to feed
415 if material_stream_1_port.direction == "inlet":
416 inlet_stream, outlet_stream = material_stream_1, material_stream_2
417 inlet_port, outlet_port = material_stream_1_port, material_stream_2_port
418 else:
419 inlet_stream, outlet_stream = material_stream_2, material_stream_1
420 inlet_port, outlet_port = material_stream_2_port, material_stream_1_port
422 # Get all the graphic objects to preserve
424 # Always preserve the graphic of the stream that is being connected to
425 preserve_graphic = material_stream_2.graphicObject.last()
426 # Update compounds
427 if inlet_stream.objectType == SimulationObjectClass.EnergyStream: 427 ↛ 428line 427 didn't jump to line 428 because the condition on line 427 was never true
428 pass
429 elif inlet_stream.objectType != SimulationObjectClass.acStream: 429 ↛ 432line 429 didn't jump to line 432 because the condition on line 429 was always true
430 update_compounds_on_merge(inlet_stream, outlet_stream)
432 def update_graphic_object_on_merge(preserve_stream, delete_stream) -> None:
433 # Preserve one graphic object for each group either stream is connected to
434 # If the stream is connected to multiple groups, it will be shown in both groups
435 preserve_stream_groups = list(preserve_stream.get_groups())
436 preserve_stream_gobjs = preserve_stream.graphicObject.all()
437 merged_gobjs = []
438 for gobj in delete_stream.graphicObject.all():
439 # Check if there's a preserved stream in this group
440 preserved_gobj = next((g for g in preserve_stream_gobjs if g.group == gobj.group), None)
441 if preserved_gobj is not None:
442 # Keep this if
443 if preserve_stream == material_stream_2:
444 # Keep this graphic object's position
445 gobj.delete()
446 else:
447 # Keep the position of the stream to be deleted
448 preserved_gobj.copy_position_from(gobj)
449 gobj.delete()
450 # If both streams are shown in multiple parent groups, we only want to show the
451 # stream at the lowest level. make a list of these merged graphicsObjects so we can remove
452 # the extras later.
453 merged_gobjs.append(preserved_gobj)
454 else:
455 # connect this graphic object to the preserved stream
456 gobj.simulationObject = preserve_stream
457 gobj.save()
459 # Of the merged graphics objects, only keep the one in the lowest group
460 parent_groups: list[int] = []
462 for gobj in merged_gobjs:
463 parent_group = gobj.group.get_parent_group()
464 if parent_group is not None:
465 parent_groups.append(parent_group.pk)
467 for gobj in merged_gobjs:
468 if gobj.group.pk in parent_groups:
469 gobj.delete()
471 def _collect_unique_groups(*group_iters) -> set["Grouping"]:
472 """Return a set of unique Grouping objects from one or more iterables.
474 This preserves iteration order only in the sense that groups seen earlier
475 are added first, but the return value is a set because callers expect
476 membership semantics rather than ordering.
477 """
478 unique = set()
479 for iterable in group_iters:
480 for g in iterable:
481 if g not in unique:
482 unique.add(g)
483 return unique
485 # get all groups that are groups of either stream
486 inlet_stream_groups = list(inlet_stream.get_groups())
487 outlet_stream_groups = list(outlet_stream.get_groups())
488 intermediate_groups: set["Grouping"] = _collect_unique_groups(inlet_stream_groups, outlet_stream_groups)
490 from flowsheetInternals.unitops.models.delete_factory import DeleteFactory
491 if not outlet_stream.has_recycle_connection and inlet_stream.has_path_to(outlet_stream):
493 # preserve the outlet stream
494 preserve_stream = outlet_stream
495 inlet_port.stream = preserve_stream
496 inlet_port.save()
497 DeleteFactory.delete_object(inlet_stream)
499 update_graphic_object_on_merge(preserve_stream, inlet_stream)
501 # attach recycle block to the inlet stream
502 # this also handles updating property access
503 outlet_stream.attach_recycle(intermediate_groups)
504 inlet_stream.delete_control_values()
506 outlet_stream.reevaluate_properties_enabled()
507 else:
508 # Preserve the outlet stream
509 preserve_stream = outlet_stream
510 inlet_port.stream = preserve_stream
511 inlet_port.save()
512 DeleteFactory.delete_object(inlet_stream)
513 # Update graphic object
514 update_graphic_object_on_merge(preserve_stream, inlet_stream)
516 return preserve_stream
518 def delete_control_values(self) -> None:
519 """
520 Deletes all control values connected to properties in this object
521 """
522 property_set: PropertySet = self.properties
523 for prop in property_set.ContainedProperties.all():
524 for value in prop.values.all():
525 if value.is_control_set_point(): 525 ↛ 526line 525 didn't jump to line 526 because the condition on line 525 was never true
526 value.controlSetPoint.delete()
528 def attach_recycle(self, intermediate_groups: set["Grouping"] | None = None) -> None:
529 """
530 Attaches a new recycle block to the stream
531 """
532 if self.has_recycle_connection: 532 ↛ 533line 532 didn't jump to line 533 because the condition on line 532 was never true
533 return # already has a recycle connection
534 from flowsheetInternals.unitops.models.simulation_object_factory import SimulationObjectFactory
535 from flowsheetInternals.graphicData.models.groupingModel import Grouping
536 from flowsheetInternals.graphicData.models.graphicObjectModel import GraphicObject
538 recycle = SimulationObjectFactory.create_simulation_object(
539 objectType="recycle",
540 flowsheet=self.flowsheet,
541 coordinates={
542 'x': self.graphicObject.last().x + self.graphicObject.last().width / 2,
543 'y': self.graphicObject.last().y + self.graphicObject.last().height / 2 + 100
544 },
545 )
547 # Set graphic objects for the recycle block in the same groups as the stream
548 default_graphic = recycle.graphicObject.last()
549 default_width = default_graphic.width if default_graphic else 32
550 default_height = default_graphic.height if default_graphic else 32
552 # Remove all graphic objects associated with the recycle so we can re-add them to proper groups
553 recycle.graphicObject.all().delete()
555 # If caller didn't provide groups, use this stream's groups
556 if intermediate_groups is None: 556 ↛ 557line 556 didn't jump to line 557 because the condition on line 556 was never true
557 intermediate_groups = set(self.get_groups())
558 else:
559 # ensure we have a set (in case a generator or queryset was passed)
560 intermediate_groups = set(intermediate_groups)
562 normalized_groups: set[Grouping] = set()
563 seen_group_ids: set[int] = set()
565 # Normalise groups to Grouping objects and remove duplicates
566 for group in intermediate_groups:
567 group_obj = None
568 if isinstance(group, Grouping): 568 ↛ 570line 568 didn't jump to line 570 because the condition on line 568 was always true
569 group_obj = group
570 elif group is not None:
571 group_obj = Grouping.objects.filter(pk=group).first()
573 if group_obj and group_obj.pk not in seen_group_ids: 573 ↛ 566line 573 didn't jump to line 566 because the condition on line 573 was always true
574 normalized_groups.add(group_obj)
575 seen_group_ids.add(group_obj.pk)
577 # Create graphic objects for each group
578 for group in normalized_groups:
579 stream_graphic = self.graphicObject.filter(group=group).last()
580 if stream_graphic is None:
581 continue
583 GraphicObject.objects.create(
584 flowsheet=self.flowsheet,
585 simulationObject=recycle,
586 width=default_width,
587 height=default_height,
588 x=stream_graphic.x + (stream_graphic.width - default_width) / 2,
589 y=stream_graphic.y + stream_graphic.height + 30,
590 group=group,
591 )
593 recycle.recycleData.update(self)
594 recycle.save()
596 def has_path_to(self, end_stream: "SimulationObject", check_recycles=True) -> bool:
597 """
598 Checks if there is a path in the flowsheet from the start stream (self) to the end stream
599 Can be used to check for loops in the flowsheet if these two streams are being merged
601 - param end_stream: The stream to check if there is a path to (from self)
602 - param check_recycles: If True, will skip the path if a stream has a recycle connection
603 """
604 remaining_streams = [self]
605 while remaining_streams:
606 current_stream = remaining_streams.pop()
607 if current_stream == end_stream:
608 # loop detected
609 return True
610 unit_op_port = current_stream.connectedPorts.filter(direction=ConType.Inlet).first()
611 if not unit_op_port:
612 continue
613 unit_op = unit_op_port.unitOp
614 # get the outlet ports of the unitop
615 connected_port_keys = get_connected_port_keys(unit_op_port.key, unit_op.schema)
616 outlet_ports = unit_op.ports.filter(
617 direction=ConType.Outlet,
618 key__in=connected_port_keys
619 )
620 for outlet_port in outlet_ports:
621 outlet_stream = outlet_port.stream
622 if outlet_stream is not None: 622 ↛ 620line 622 didn't jump to line 620 because the condition on line 622 was always true
623 if check_recycles and outlet_stream.has_recycle_connection: 623 ↛ 624line 623 didn't jump to line 624 because the condition on line 623 was never true
624 continue
625 remaining_streams.append(outlet_stream)
626 return False
628 def make_decision_node(self, num_inlets, num_outlets):
629 """
630 Turns a stream into a decision node with n inlet and m outlet ports
631 :param num_inlets: number of inlet ports to create
632 :param num_outlets: number of outlet ports to create
633 :return: Decision Node object
634 """
635 from flowsheetInternals.unitops.models.simulation_object_factory import SimulationObjectFactory
636 if self.objectType == SimulationObjectClass.Stream: 636 ↛ exitline 636 didn't return from function 'make_decision_node' because the condition on line 636 was always true
638 # Create Decision Node
639 modified_schema: ObjectType = configuration["decisionNode"].model_copy(deep=True)
640 modified_schema.ports["inlet"].default = num_inlets
641 modified_schema.ports["outlet"].default = num_outlets
642 graphicObject = self.graphicObject.last() # For now, we are assuming this only has one graphicObject.
643 parentGroup = self.graphicObject.last().group.id
644 decision_node = SimulationObjectFactory.create_simulation_object(
645 coordinates={'x': graphicObject.x, 'y': graphicObject.y},
646 objectType="decisionNode",
647 schema=modified_schema,
648 flowsheet=self.flowsheet,
649 create_attached_streams=False,
650 parentGroup=parentGroup,
651 )
653 if self.connectedPorts.count() > 1:
654 # Connect both streams to Decision Node
655 ms_outlet_port = self.connectedPorts.filter(direction=ConType.Inlet).first()
656 dn_outlet_port = decision_node.ports.filter(direction=ConType.Outlet).first()
657 dn_inlet_port = decision_node.ports.filter(direction=ConType.Inlet).first()
658 new_stream = SimulationObjectFactory.create_stream_at_port(port=dn_outlet_port)
659 ms_outlet_port.stream = new_stream
660 dn_inlet_port.stream = self
662 # Save Objects
663 ms_outlet_port.save()
664 dn_outlet_port.save()
665 dn_inlet_port.save()
666 decision_node.save()
668 # Center GraphicObjects
669 self.horizontally_center_graphic()
670 new_stream.horizontally_center_graphic()
671 self.save()
672 new_stream.save()
673 else:
674 # For now, only dealing with the case that a regular stream is initialized with one port
675 port = decision_node.ports.first()
676 port.stream = self
677 port.save()
678 self.horizontally_center_graphic()
679 self.save()
680 self.save()
681 decision_node.save()
682 # handle compounds for decision node
683 update_decision_node_and_propagate(decision_node, updated_via_right_click=True)
684 return decision_node
686 def update_compounds(self, compounds: list[str]) -> None:
687 """
688 Updates the compounds for this stream
689 """
690 update_compounds_on_set(self, compounds)
692 def add_port(self, key: str, existing_stream: "SimulationObject | None" = None) -> Port:
693 """
694 Adds a port to this object and adds a a new stream if none is provided
695 """
696 from flowsheetInternals.unitops.models.simulation_object_factory import SimulationObjectFactory
697 # Create ports
698 ports_schema = self.schema.ports
700 # replace any many=True ports with multiple ports
701 port_dict = ports_schema[key]
702 # get the next index for the port (looks at all ports with the same key and gets the length of the l
703 next_index = self.ports.filter(key=key).count()
704 new_port = Port(
705 key=key,
706 index=next_index,
707 direction=port_dict.type,
708 displayName=port_dict.displayName + f" {next_index + 1}",
709 unitOp=self,
710 flowsheet=self.flowsheet
711 )
712 new_port.save()
714 self.update_height()
716 if existing_stream:
717 # connect existing stream to the new port
718 new_port.stream = existing_stream
719 new_port.save() # might have to remove this trying to figure out why other streams are being removed
721 if self.objectType == "decisionNode": 721 ↛ 790line 721 didn't jump to line 790 because the condition on line 721 was always true
722 # Use existing function to update decision node compounds
723 update_decision_node_and_propagate(self)
724 else:
725 # create a new strea at the port using the factory
726 new_stream = SimulationObjectFactory.create_stream_at_port(new_port)
727 update_compounds_on_add_stream(new_port, new_stream)
728 outlet_name = self.schema.splitter_fraction_name
729 for property_key, property in self.schema.properties.items():
730 if property.indexSets is not None and ( 730 ↛ 788line 730 didn't jump to line 788 because the condition on line 730 was always true
731 'splitter_fraction' in property.indexSets) and new_port.direction == ConType.Outlet:
732 property_infos = self.properties.ContainedProperties.all()
733 property_info = property_infos.first()
734 new_indexed_item = IndexedItem.objects.create(
735 owner=self,
736 key="outlet_" + f"{next_index + 1}",
737 displayName=outlet_name + f" {next_index + 1}",
738 type="splitter_fraction",
739 flowsheet=self.flowsheet,
740 )
742 # get indexed set for split_fraction or priorities:
743 index_sets = property.indexSets
745 # figure out which other indexed items this should link to
746 indexed_item_sets: List[List[IndexedItem]] = []
747 for index in index_sets:
748 if index == IndexChoices.SplitterFraction:
749 continue # We don't need this, it's the one we're editing
750 indexed_items: List[IndexedItem] = self.get_indexed_items(
751 index) # e.g get_indexed_items("phase"), "compound", etc
752 indexed_item_sets.append(indexed_items)
754 # create a property value for each combination of indexed items, for the new outlet
755 combinations = list(itertools.product(*indexed_item_sets))
757 # Bulk update existing property values to be enabled
758 property_status = False
759 if self.objectType == "header" or self.objectType == "simple_header": 759 ↛ 760line 759 didn't jump to line 760 because the condition on line 759 was never true
760 property_status = True
761 else:
762 property_info.values.update(enabled=True)
764 # bulk create indexed items (all disabled as it's the last outlet)
765 index_links_to_create: List[PropertyValueIntermediate] = []
766 for combination in combinations:
767 # Create a propertyValue for this combination
768 combo_property_value = PropertyValue.objects.create(property=property_info,
769 enabled=property_status,
770 flowsheet=self.flowsheet)
771 # link it up to the new outlet
772 index_links_to_create.append(
773 PropertyValueIntermediate(propertyvalue=combo_property_value, indexeditem=new_indexed_item)
774 )
776 # PropertyValueIntermediate.objects.create(propertyvalue=combo_property_value, indexeditem=new_indexed_item)
777 # Also link it up to all the other sets
778 for indexed_item in combination:
779 # PropertyValueIntermediate.objects.create(propertyvalue=combo_property_value, indexeditem=indexed_item)
781 index_links_to_create.append(
782 PropertyValueIntermediate(propertyvalue=combo_property_value, indexeditem=indexed_item)
783 )
785 # Bulk create the links
786 PropertyValueIntermediate.objects.bulk_create(index_links_to_create)
788 new_stream.save()
790 return new_port
792 def update_height(self):
793 """
794 Updates the height of the graphic object based on the number of ports
795 """
796 if self.schema.graphicObject.autoHeight: # e.g header has auto height calculation 796 ↛ 797line 796 didn't jump to line 797 because the condition on line 796 was never true
797 max_ports = max(self.ports.filter(direction=ConType.Inlet).count(),
798 self.ports.filter(direction=ConType.Outlet).count())
799 self.graphicObject.update(height=max_ports * 200)
801 def get_indexed_items(self, index_set_type: IndexChoices) -> List[IndexedItem]:
802 match index_set_type:
803 case IndexChoices.Phase:
804 # get all the indexedItems that are type=phase
805 items = IndexedItem.objects.filter(owner=self, type=IndexChoices.Phase).all()
806 return items
807 case IndexChoices.Compound: 807 ↛ 810line 807 didn't jump to line 810 because the pattern on line 807 always matched
808 items = IndexedItem.objects.filter(owner=self, type=IndexChoices.Compound).all()
809 return items
810 case _:
811 raise ValueError("Get_indexed_items didn't expect this index set type")
813 def merge_decision_nodes(self, decision_node_active: "SimulationObject", decision_node_over: "SimulationObject") -> \
814 Optional["SimulationObject"]:
815 """
816 Merges this decision node with the over decision node and handles graphics positioning
817 """
818 import flowsheetInternals.unitops.models.delete_factory as DeleteFactory
820 # Get all streams from active node
821 inlet_streams = decision_node_active.ports.filter(direction="inlet").all()
822 outlet_streams = decision_node_active.ports.filter(direction="outlet").all()
824 # Transfer and reposition inlet streams
825 for inlet_port in inlet_streams:
826 stream = inlet_port.stream
827 # Add stream to new decision node
828 decision_node_over.add_port("inlet", stream)
830 # Update compounds and propagate
831 update_decision_node_and_propagate(decision_node_over)
833 # Transfer and reposition outlet streams
834 for outlet_port in outlet_streams:
835 stream = outlet_port.stream
836 # Add stream to new decision node
837 decision_node_over.add_port("outlet", stream)
839 # Delete the active node and its graphic object
840 DeleteFactory.delete_object(decision_node_active)
841 decision_node_over.save()
842 return decision_node_over
844 def reevaluate_properties_enabled(self) -> None:
845 """
846 Reevaluates property access for all properties in this object
848 This should only really be called when connections are changed,
849 otherwise the property enabling should be handled by adding
850 or removing control values.
851 """
852 if (self.objectType == SimulationObjectClass.MachineLearningBlock):
853 return # We don't want to change from the defaults
854 properties: list[PropertyInfo] = self.properties.ContainedProperties.all()
855 config = self.schema
856 config_properties = config.properties
857 config_groups = config.propertySetGroups
859 def _eval_enabled(prop: PropertyInfo, config_group) -> bool:
860 if self.is_stream():
861 # disable outlet/intermediate stream properties
862 ports = self.connectedPorts.all()
863 if len(ports) == 2 or (len(ports) == 1 and ports[0].direction == "outlet"):
864 return False
865 if config_group is None: 865 ↛ 866line 865 didn't jump to line 866 because the condition on line 865 was never true
866 return True # default to enabled, e.g for custom properties
867 if config_group.type == "stateVars":
868 state_vars = getattr(config_group, "stateVars") or ()
869 return prop.key in state_vars
870 return True # eg. All, default to enabled
872 list_prop_val = []
874 for prop in properties:
875 config_prop = config_properties.get(prop.key)
876 if config_prop: 876 ↛ 879line 876 didn't jump to line 879 because the condition on line 876 was always true
877 group = config_prop.propertySetGroup
878 else:
879 group = "default"
880 config_group = config_groups.get(group, None)
881 res = _eval_enabled(prop, config_group)
882 list_prop_val.extend(prop.enable(res))
884 PropertyValue.objects.bulk_update(list_prop_val, ["enabled"])
886 def get_unspecified_properties(self) -> list:
887 if not hasattr(self, "properties") or not self.properties: 887 ↛ 888line 887 didn't jump to line 888 because the condition on line 887 was never true
888 return []
890 contained_properties: models.QuerySet[PropertyInfo] = self.properties.ContainedProperties.all()
891 is_splitter = getattr(self.schema, "displayType", "").lower() == "splitter"
893 # use schema stateVars to find required properties
894 required_properties: Set[str] = set()
895 for group in self.schema.propertySetGroups.values():
896 if group.toggle and not self.properties.get_property(group.toggle).get_value(): 896 ↛ 898line 896 didn't jump to line 898 because the condition on line 896 was never true
897 # The group is toggled off, so we don't care that the properties are not specified
898 continue
899 if group.type == "stateVars" or group.type == "composition" or group.type == "exceptLast": 899 ↛ 895line 899 didn't jump to line 895 because the condition on line 899 was always true
900 required_properties.update(group.stateVars)
902 # use pre fetched data to avoid additional queries
903 unspecified_properties = []
904 property_info: PropertyInfo
905 for property_info in contained_properties:
906 # check if the property has no values or if all values are invalid
907 has_valid_value = property_info.isSpecified()
908 if not has_valid_value and property_info.key in required_properties:
909 unspecified_properties.append(property_info.key)
911 # check if mole_frac_comp sums to 1
912 mole_frac_props = [x for x in contained_properties if x.key == "mole_frac_comp"]
914 for prop in mole_frac_props:
915 if prop.has_value_bulk():
916 total = 0.0
917 for value in prop.values.all():
918 try:
919 val = float(value.value)
920 total += val
921 except (ValueError, TypeError):
922 continue
923 if abs(total - 1.0) > 0.001 and "mole_frac_comp" not in unspecified_properties: 923 ↛ 924line 923 didn't jump to line 924 because the condition on line 923 was never true
924 unspecified_properties.append("mole_frac_comp")
925 else:
926 if "mole_frac_comp" not in unspecified_properties:
927 unspecified_properties.append("mole_frac_comp")
929 return unspecified_properties
931 def delete(self, *args, **kwargs):
932 raise NotImplementedError("Use delete_object method from DeleteFactory")
934 def permanently_delete(self, *args, **kwargs):
935 """
936 Permanently deletes the object from the database.
937 This should only be used in tests or when you are sure you want to delete the object.
938 """
939 super().delete(*args, **kwargs)
941 def delete_empty_node(self, connected_stream: "SimulationObject", connected_stream_port: "Port"):
942 """
943 Deletes the empty_port stream from parent groups when creating a decision node inside a group.
944 :param connected_stream: Stream connected to the empty port
945 :param connected_stream_port: Port of the connected stream
946 """
947 simulation_object_id = connected_stream.id
948 current_group = connected_stream_port.unitOp.get_group()
949 parent_groups = connected_stream_port.unitOp.get_parent_groups()
951 for parent_group in parent_groups:
952 if parent_group != current_group:
953 gobjs_in_group = connected_stream.graphicObject.filter(group=parent_group)
954 for graphic_object in gobjs_in_group:
955 if graphic_object.simulationObject.id == simulation_object_id: 955 ↛ 954line 955 didn't jump to line 954 because the condition on line 955 was always true
956 graphic_object.delete()