Coverage for backend/flowsheetInternals/unitops/models/SimulationObject.py: 83%
465 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
1from django.db import models
2from core.managers import SoftDeleteManager
3from core.auxiliary.enums.unitOpData import SimulationObjectClass
4from core.auxiliary.models.Flowsheet import Flowsheet
5from core.auxiliary.models.PropertySet import PropertySet
6from core.auxiliary.models.PropertyInfo import PropertyInfo
7from core.auxiliary.models.PropertyValue import PropertyValue, PropertyValueIntermediate
8from core.auxiliary.models.IndexedItem import IndexedItem, IndexChoices
9from typing import Set
12from flowsheetInternals.propertyPackages.models.SimulationObjectPropertyPackages import SimulationObjectPropertyPackages
13from flowsheetInternals.unitops.models.Port import Port
14from typing import Iterable
15from flowsheetInternals.unitops.config.config_methods import *
16from common.config_types import *
17import itertools
19from .compound_propogation import update_compounds_on_set, update_compounds_on_merge, _get_compound_keys, update_decision_node_and_propagate, update_compounds_on_add_stream
20from typing import Optional, List
21from ..methods.add_expression import add_expression as _add_expression
24class SimulationObject(models.Model):
25 flowsheet = models.ForeignKey(Flowsheet, on_delete=models.CASCADE, related_name="flowsheetObjects")
26 componentName = models.CharField(max_length=64)
27 objectType = models.CharField(choices= SimulationObjectClass.choices)
29 created_at = models.DateTimeField(auto_now_add=True)
30 is_deleted = models.BooleanField(default=False)
31 initial_values = models.JSONField(null=True, blank=True)
33 #add a soft delete manager
34 objects = SoftDeleteManager()
35 add_expression = _add_expression
37 @property
38 def schema(self) -> ObjectType:
39 return get_object_schema(self)
41 @property
42 def has_recycle_connection(self) -> bool:
43 return hasattr(self, "recycleConnection")
45 def is_stream(self) -> bool:
46 return self.schema.is_stream
48 def get_stream(self, key: str, index: int = 0) -> "SimulationObject":
49 """
50 Returns the stream attached to the port with the given key
51 """
52 port = self.get_port(key, index)
53 stream = port.stream
54 return stream
56 def get_group(self) -> "Grouping":
57 """
58 Returns the group that this object belongs to
59 """
60 if not self.is_stream(): 60 ↛ 65line 60 didn't jump to line 65 because the condition on line 60 was always true
61 # There is only one graphic object, so we can just return the group
62 return self.graphicObject.last().group
63 else:
64 # Streams arent' really in a group, throw an error
65 raise ValueError("Streams do not belong to a group")
67 def get_groups(self) -> Iterable["Grouping"]:
68 """
69 Returns an iterable of groups that this object belongs to. Mostly needed for streams that have many groups.
70 """
71 return (graphic.group for graphic in self.graphicObject.all())
73 def get_parent_groups(self) -> List["Grouping"]:
74 """
75 Returns an iterable of parent groups that this object belongs to.
76 """
77 parent_groups = []
78 current_group = self.get_group()
79 while current_group:
80 parent_groups.append(current_group)
81 simulationObject = current_group.simulationObject
82 if simulationObject is None: 82 ↛ 83line 82 didn't jump to line 83 because the condition on line 82 was never true
83 break
84 current_group = simulationObject.get_group()
85 return parent_groups
87 def get_property_package(self, name: str | None = None) -> SimulationObjectPropertyPackages:
88 """
89 Returns the property package slot with the given name
90 """
91 property_package_slot: SimulationObjectPropertyPackages
92 if name is None:
93 property_package_slot = self.propertyPackages.first()
94 else:
95 property_package_slot = self.propertyPackages.get(name=name)
96 return property_package_slot
99 def set_property_package(self, property_package, name: str | None = None) -> None:
100 """
101 Sets the property package for this object
102 """
103 property_package_slot = self.get_property_package(name)
104 property_package_slot.propertyPackage = property_package
105 property_package_slot.save()
108 def get_port(self, key: str, index: int = 0) -> Port:
109 """
110 Returns the port with the given key
111 """
112 try:
114 port: Port = self.ports.get(key=key, index=index)
115 return port
116 except Port.DoesNotExist:
117 raise ValueError(f"Port with key {key} does not exist on object {self.componentName}")
120 def reorder_object_ports(self):
121 """
122 Reorders port mappings by connected unit operation y position
123 :return: None
124 """
125 inlet_connections = [port for port in self.ports.filter(direction=ConType.Inlet).all()]
126 outlet_connections = [port for port in self.ports.filter(direction=ConType.Outlet).all()]
127 inlet_connections.sort(key=lambda port: port.stream.connectedPorts.get(direction=ConType.Outlet).unitOp.graphicObject.last().y)
128 outlet_connections.sort(key=lambda port: port.stream.connectedPorts.get(direction=ConType.Inlet).unitOp.graphicObject.last().y)
129 for i, port in enumerate(inlet_connections):
130 port.index = i
131 port.save()
132 for i, port in enumerate(outlet_connections):
133 port.index = i
134 port.save()
135 self.save()
138 def horizontally_center_graphic(self) -> None:
139 """
140 Horizontally centers GraphicObject for a stream or unit operation
141 Currently uni-directional - left to right connections
142 Centers based on inlet and outlet graphics.
143 :return: None
144 """
145 # TODO: Make this work as appropriate for streams with multiple graphic objects
146 # Is an Intermediate Stream
147 if self.objectType == SimulationObjectClass.Stream or SimulationObjectClass.EnergyStream or SimulationObjectClass.acStream and self.connectedPorts.count() > 1: 147 ↛ 158line 147 didn't jump to line 158 because the condition on line 147 was always true
148 inlet, outlet = self.connectedPorts.filter(direction=ConType.Outlet).first(), self.connectedPorts.filter(direction=ConType.Inlet).first()
149 inlet_x, outlet_x = inlet.unitOp.graphicObject.last().x, outlet.unitOp.graphicObject.last().x
150 if inlet_x <= outlet_x:
151 self.graphicObject.last().x = abs((inlet_x + (inlet.unitOp.graphicObject.last().width)) + (outlet_x)) / 2
152 else:
153 self.graphicObject.last().x = abs(inlet_x + (outlet_x + outlet.unitOp.graphicObject.last().width)) / 2
154 # Flip Graphic Object horizontally
155 self.graphicObject.last().save()
156 self.save()
157 else:
158 inlet, outlet = self.ports.filter(direction=ConType.Inlet).first(), self.ports.filter(direction=ConType.Outlet).first()
159 self.graphicObject.last().x = abs((inlet.stream.graphicObject.last().x + outlet.stream.graphicObject.last().x) / 2)
160 self.graphicObject.last().save()
161 self.save()
162 return
165 def vertically_center_graphic(self) -> None:
166 """
167 Vertically centers GraphicObject for a stream or unit operation
168 :return: None
169 """
170 # TODO: Make this work as appropriate for streams with multiple graphic objects
171 # Is an intermediate stream
172 if self.objectType == SimulationObjectClass.Stream or SimulationObjectClass.EnergyStream or SimulationObjectClass.acStream and self.connectedPorts.count() > 1:
173 inlet, outlet = self.connectedPorts.filter(direction=ConType.Inlet).first(), self.connectedPorts.filter(direction=ConType.Outlet).first()
174 self.graphicObject.last().y = abs((inlet.unitOp.graphicObject.last().y + outlet.unitOp.graphicObject.last().y) / 2)
175 self.graphicObject.last().save()
176 self.save()
177 else:
178 inlet, outlet = self.ports.filter(direction=ConType.Inlet).first(), self.ports.filter(direction=ConType.Outlet).all()[1]
179 self.graphicObject.last().y = abs((inlet.stream.graphicObject.last().y + outlet.stream.graphicObject.last().y) / 2)
180 self.graphicObject.last().save()
181 self.save()
182 return
185 def split_stream(self) -> "SimulationObject":
186 """
187 Splits a stream into two separate streams (one inlet and one outlet - disconnected).
188 :return: New Stream Object (outlet stream) if object is a stream.
189 """
190 from flowsheetInternals.unitops.models.simulation_object_factory import SimulationObjectFactory
191 from flowsheetInternals.graphicData.models.graphicObjectModel import GraphicObject
193 new_stream = None
194 if self.objectType == SimulationObjectClass.Stream or SimulationObjectClass.EnergyStream or SimulationObjectClass.acStream: 194 ↛ 265line 194 didn't jump to line 265 because the condition on line 194 was always true
195 connectedPorts = self.connectedPorts.all()
196 inlet_port : Port = connectedPorts.get(direction=ConType.Inlet)
197 outlet_port : Port = connectedPorts.get(direction=ConType.Outlet)
198 new_stream = SimulationObjectFactory.create_stream_at_port(inlet_port)
199 new_stream.save()
200 # reset the stream to default position
201 coordinates = SimulationObjectFactory.default_stream_position(outlet_port.unitOp, outlet_port)
202 stream_graphic_object = self.graphicObject.last()
203 stream_graphic_object.x = coordinates['x'] - stream_graphic_object.width / 2
204 stream_graphic_object.y = coordinates['y'] - stream_graphic_object.height / 2
205 stream_graphic_object.save()
208 """
209 If a stream connects two groups together, it will have graphic objects in each of those groups.
210 This next section figures out which graphic object to keep, and which to move to the new stream.
211 """
212 all_graphic_objects = self.graphicObject.all()
213 default_graphic_object = new_stream.graphicObject.last() # a default graphic object is created by create_stream_at_port
214 # The graphic objects in these groups should be kept
215 groups_to_keep : List[int] = [g.id for g in outlet_port.unitOp.get_parent_groups()]
216 groups_to_move : List[int] = [g.id for g in inlet_port.unitOp.get_parent_groups()]
219 for gobj in all_graphic_objects:
220 if gobj.group.id in groups_to_keep:
221 # keep it here
222 if gobj.group.id in groups_to_move:
223 # move the default graphic object into this group, so both the inlet and outlet show in this group
224 default_graphic_object.group = gobj.group
225 default_graphic_object.save()
226 else:
227 if gobj.group.id not in groups_to_move: 227 ↛ 228line 227 didn't jump to line 228 because the condition on line 227 was never true
228 print("Error: this should be in either ggroups to move or the other", gobj.group)
229 # must be in groups_to_move
230 # move graphic object to this stream
231 gobj.simulationObject = new_stream
232 gobj.save()
234 # make sure both the original and new streams are shown in all relevant groups
235 for graphic_object in self.graphicObject.all():
236 current_group = graphic_object.group
237 while current_group:
238 parent_group = current_group.get_parent_group()
239 if parent_group:
240 # make sure the original stream is shown in the parent group
241 if not self.graphicObject.filter(group=parent_group).exists(): 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true
242 GraphicObject.objects.create(
243 flowsheet=self.flowsheet,
244 simulationObject=self,
245 width=graphic_object.width,
246 height=graphic_object.height,
247 x=graphic_object.x,
248 y=graphic_object.y,
249 group=parent_group,
250 )
252 # make sure the new stream is shown in the parent group
253 if not new_stream.graphicObject.filter(group=parent_group).exists(): 253 ↛ 254line 253 didn't jump to line 254 because the condition on line 253 was never true
254 GraphicObject.objects.create(
255 flowsheet=self.flowsheet,
256 simulationObject=new_stream,
257 width=graphic_object.width,
258 height=graphic_object.height,
259 x=graphic_object.x,
260 y=graphic_object.y,
261 group=parent_group
262 )
263 current_group = parent_group
265 return new_stream
267 def merge_parallel_streams(self, connected_stream: "SimulationObject", decision_node=None, coordinates=None) -> "SimulationObject":
268 """
269 Merges two streams that have the same direction (Both Inlet or Outlet)
270 :param connected_stream: Stream to connect
271 :param decision_node: Decision Node object to merges streams into (In the case of n-inlet / n-outlet)
272 :return: Decision Node Object
273 """
274 from flowsheetInternals.unitops.models.simulation_object_factory import SimulationObjectFactory
275 is_inlet = self.connectedPorts.first().direction == ConType.Inlet
276 direction = ConType.Outlet if is_inlet else ConType.Inlet
277 if decision_node is None:
278 modified_schema: ObjectType = configuration["decisionNode"].model_copy(deep=True)
279 if(is_inlet): 279 ↛ 283line 279 didn't jump to line 283 because the condition on line 279 was always true
280 modified_schema.ports["outlet"].default = 2
281 modified_schema.ports["inlet"].default = 2
282 else:
283 modified_schema.ports["inlet"].default = 2
284 modified_schema.ports["outlet"].default = 2
285 decision_node = SimulationObjectFactory.create_simulation_object(
286 coordinates={'x': self.graphicObject.last().x, 'y': self.graphicObject.last().y} if coordinates is None else coordinates,
287 objectType="decisionNode",
288 schema=modified_schema,
289 flowsheet=self.flowsheet,
290 create_attached_streams=False,
291 )
292 decision_node.graphicObject.last().rotation = self.connectedPorts.first().unitOp.graphicObject.last().rotation
293 decision_node.graphicObject.last().save()
294 ports = decision_node.ports.filter(direction=direction).all()
295 port1 = ports[0]
296 port2 = ports[1]
297 port1.stream = self
298 port2.stream = connected_stream
299 port1.save()
300 port2.save()
302 decision_node.save()
304 # Center GraphicObjects
305 self.horizontally_center_graphic()
306 connected_stream.horizontally_center_graphic()
307 self.save()
308 connected_stream.save()
310 return decision_node
313 def merge_stream(self, connectStream: "SimulationObject") -> "SimulationObject":
314 """
315 Connects this object (a stream) to another stream
316 :param connectedStream: Stream to connect to this object
317 :return: Decision Node Object
318 """
319 material_stream_1 = self
320 material_stream_2 = connectStream
322 material_stream_1_port = material_stream_1.connectedPorts.first()
323 material_stream_2_port = material_stream_2.connectedPorts.first()
325 # Case 1: Intermediate to Intermediate
326 if material_stream_1.connectedPorts.count() > 1 and material_stream_2.connectedPorts.count() > 1:
327 coordinates = {'x': material_stream_2.graphicObject.last().x, 'y': material_stream_2.graphicObject.last().y}
328 outlet_stream_1 = material_stream_1.split_stream()
329 outlet_stream_2 = material_stream_2.split_stream()
331 decision_node = outlet_stream_2.merge_parallel_streams(outlet_stream_1, coordinates=coordinates)
332 decision_node = material_stream_2.merge_parallel_streams(material_stream_1, decision_node)
334 update_decision_node_and_propagate(decision_node)
335 decision_node.save()
336 return decision_node
338 # Case 2: Connecting intermediate stream with product or feed
339 if material_stream_1.connectedPorts.count() > 1 or material_stream_2.connectedPorts.count() > 1:
340 inter_stream, connected_stream = (
341 (material_stream_1, material_stream_2) if material_stream_1.connectedPorts.count() > material_stream_2.connectedPorts.count()
342 else (material_stream_2, material_stream_1)
343 )
344 connected_stream_port = connected_stream.connectedPorts.first()
345 is_inlet = (connected_stream_port.direction == ConType.Inlet)
347 if is_inlet:
348 # Feed -> Intermediate
349 decision_node = inter_stream.make_decision_node(num_inlets=1, num_outlets=2)
350 empty_port = decision_node.ports.filter(direction=ConType.Outlet, stream=None).first()
351 empty_port.stream = connected_stream
352 empty_port.save()
354 # Center graphics
355 self.horizontally_center_graphic()
356 connected_stream.horizontally_center_graphic()
357 self.save()
358 connected_stream.save()
360 # Update compounds - First update decision node, then propagate
361 update_decision_node_and_propagate(decision_node)
362 return decision_node
363 else:
364 # Product -> Intermediate
365 decision_node = inter_stream.make_decision_node(num_inlets=2, num_outlets=1)
366 empty_port = decision_node.ports.filter(direction=ConType.Inlet, stream=None).first()
367 empty_port.stream = connected_stream
368 empty_port.save()
370 # Center graphics
371 self.horizontally_center_graphic()
372 connected_stream.horizontally_center_graphic()
373 self.save()
374 connected_stream.save()
376 # Update compounds - First update decision node, then propagate
377 update_decision_node_and_propagate(decision_node)
378 return decision_node
380 # Case 3: Feed to feed or product to product
381 if material_stream_1_port.direction == material_stream_2_port.direction: 381 ↛ 382line 381 didn't jump to line 382 because the condition on line 381 was never true
382 result = material_stream_2.merge_parallel_streams(material_stream_1)
383 update_decision_node_and_propagate(result)
384 return result
386 # Case 4: Feed to product or product to feed
387 if material_stream_1_port.direction == "inlet":
388 inlet_stream, outlet_stream = material_stream_1, material_stream_2
389 inlet_port, outlet_port = material_stream_1_port, material_stream_2_port
390 else:
391 inlet_stream, outlet_stream = material_stream_2, material_stream_1
392 inlet_port, outlet_port = material_stream_2_port, material_stream_1_port
394 # Get all the graphic objects to preserve
396 # Always preserve the graphic of the stream that is being connected to
397 preserve_graphic = material_stream_2.graphicObject.last()
398 # Update compounds
399 if inlet_stream.objectType == SimulationObjectClass.EnergyStream: 399 ↛ 400line 399 didn't jump to line 400 because the condition on line 399 was never true
400 pass
401 elif inlet_stream.objectType != SimulationObjectClass.acStream: 401 ↛ 404line 401 didn't jump to line 404 because the condition on line 401 was always true
402 update_compounds_on_merge(inlet_stream, outlet_stream)
404 def update_graphic_object_on_merge(preserve_stream,delete_stream) -> None:
405 # Preserve one graphic object for each group either stream is connected to
406 # If the stream is connected to multiple groups, it will be shown in both groups
407 preserve_stream_groups = list(preserve_stream.get_groups())
408 preserve_stream_gobjs = preserve_stream.graphicObject.all()
409 merged_gobjs = []
410 for gobj in delete_stream.graphicObject.all():
411 # Check if there's a preserved stream in this group
412 preserved_gobj = next((g for g in preserve_stream_gobjs if g.group == gobj.group), None)
413 if preserved_gobj is not None:
414 # Keep this if
415 if preserve_stream == material_stream_2:
416 # Keep this graphic object's position
417 gobj.delete()
418 else:
419 # Keep the position of the stream to be deleted
420 preserved_gobj.copy_position_from(gobj)
421 gobj.delete()
422 # If both streams are shown in multiple parent groups, we only want to show the
423 # stream at the lowest level. make a list of these merged graphicsObjects so we can remove
424 # the extras later.
425 merged_gobjs.append(preserved_gobj)
426 else:
427 # connect this graphic object to the preserved stream
428 gobj.simulationObject = preserve_stream
429 gobj.save()
431 # Of the merged graphics objects, only keep the one in the lowest group
432 parent_groups: list[int] = []
434 for gobj in merged_gobjs:
435 parent_group = gobj.group.get_parent_group()
436 if parent_group is not None:
437 parent_groups.append(parent_group.pk)
439 for gobj in merged_gobjs:
440 if gobj.group.pk in parent_groups:
441 pass
442 gobj.delete()
444 from flowsheetInternals.unitops.models.delete_factory import DeleteFactory
445 if not outlet_stream.has_recycle_connection and inlet_stream.has_path_to(outlet_stream):
446 # preserve the outlet stream
447 preserve_stream = outlet_stream
448 inlet_port.stream = preserve_stream
449 inlet_port.save()
450 DeleteFactory.delete_object(inlet_stream)
452 update_graphic_object_on_merge(preserve_stream, inlet_stream)
454 # attach recycle block to the inlet stream
455 # this also handles updating property access
456 outlet_stream.attach_recycle()
457 inlet_stream.delete_control_values()
459 outlet_stream.reevaluate_properties_enabled()
460 else:
461 # Preserve the outlet stream
462 preserve_stream = outlet_stream
463 inlet_port.stream = preserve_stream
464 inlet_port.save()
465 DeleteFactory.delete_object(inlet_stream)
466 # Update graphic object
467 update_graphic_object_on_merge(preserve_stream, inlet_stream)
469 return preserve_stream
471 def delete_control_values(self) -> None:
472 """
473 Deletes all control values connected to properties in this object
474 """
475 property_set: PropertySet = self.properties
476 for prop in property_set.ContainedProperties.all():
477 for value in prop.values.all():
478 if value.is_control_set_point(): 478 ↛ 479line 478 didn't jump to line 479 because the condition on line 478 was never true
479 value.controlSetPoint.delete()
482 def attach_recycle(self) -> None:
483 """
484 Attaches a new recycle block to the stream
485 """
486 if self.has_recycle_connection: 486 ↛ 487line 486 didn't jump to line 487 because the condition on line 486 was never true
487 return # already has a recycle connection
488 from flowsheetInternals.unitops.models.simulation_object_factory import SimulationObjectFactory
489 recycle = SimulationObjectFactory.create_simulation_object(
490 objectType="recycle",
491 flowsheet=self.flowsheet,
492 coordinates={
493 'x': self.graphicObject.last().x + self.graphicObject.last().width / 2,
494 'y': self.graphicObject.last().y + self.graphicObject.last().height / 2 + 100
495 },
496 )
497 recycle.recycleData.update(self)
498 recycle.save()
501 def has_path_to(self, end_stream: "SimulationObject", check_recycles=True) -> bool:
502 """
503 Checks if there is a path in the flowsheet from the start stream (self) to the end stream
504 Can be used to check for loops in the flowsheet if these two streams are being merged
506 - param end_stream: The stream to check if there is a path to (from self)
507 - param check_recycles: If True, will skip the path if a stream has a recycle connection
508 """
509 remaining_streams = [self]
510 while remaining_streams:
511 current_stream = remaining_streams.pop()
512 if current_stream == end_stream:
513 # loop detected
514 return True
515 unit_op_port = current_stream.connectedPorts.filter(direction=ConType.Inlet).first()
516 if not unit_op_port:
517 continue
518 unit_op = unit_op_port.unitOp
519 # get the outlet ports of the unitop
520 connected_port_keys = get_connected_port_keys(unit_op_port.key, unit_op.schema)
521 outlet_ports = unit_op.ports.filter(
522 direction=ConType.Outlet,
523 key__in=connected_port_keys
524 )
525 for outlet_port in outlet_ports:
526 outlet_stream = outlet_port.stream
527 if outlet_stream is not None: 527 ↛ 525line 527 didn't jump to line 525 because the condition on line 527 was always true
528 if check_recycles and outlet_stream.has_recycle_connection: 528 ↛ 529line 528 didn't jump to line 529 because the condition on line 528 was never true
529 continue
530 remaining_streams.append(outlet_stream)
531 return False
534 def make_decision_node(self, num_inlets, num_outlets):
535 """
536 Turns a stream into a decision node with n inlet and m outlet ports
537 :param num_inlets: number of inlet ports to create
538 :param num_outlets: number of outlet ports to create
539 :return: Decision Node object
540 """
541 from flowsheetInternals.unitops.models.simulation_object_factory import SimulationObjectFactory
542 if self.objectType == SimulationObjectClass.Stream: 542 ↛ exitline 542 didn't return from function 'make_decision_node' because the condition on line 542 was always true
544 # Create Decision Node
545 modified_schema: ObjectType = configuration["decisionNode"].model_copy(deep=True)
546 modified_schema.ports["inlet"].default = num_inlets
547 modified_schema.ports["outlet"].default = num_outlets
548 graphicObject = self.graphicObject.last() # For now, we are assuming this only has one graphicObject.
549 decision_node = SimulationObjectFactory.create_simulation_object(
550 coordinates={'x': graphicObject.x, 'y': graphicObject.y},
551 objectType="decisionNode",
552 schema=modified_schema,
553 flowsheet=self.flowsheet,
554 create_attached_streams=False,
555 )
558 if self.connectedPorts.count() > 1:
559 # Connect both streams to Decision Node
560 ms_outlet_port = self.connectedPorts.filter(direction=ConType.Inlet).first()
561 dn_outlet_port = decision_node.ports.filter(direction=ConType.Outlet).first()
562 dn_inlet_port = decision_node.ports.filter(direction=ConType.Inlet).first()
563 new_stream = SimulationObjectFactory.create_stream_at_port(port=dn_outlet_port)
564 ms_outlet_port.stream = new_stream
565 dn_inlet_port.stream = self
567 # Save Objects
568 ms_outlet_port.save()
569 dn_outlet_port.save()
570 dn_inlet_port.save()
571 decision_node.save()
573 # Center GraphicObjects
574 self.horizontally_center_graphic()
575 new_stream.horizontally_center_graphic()
576 self.save()
577 new_stream.save()
578 else:
579 # For now, only dealing with the case that a regular stream is initialized with one port
580 port = decision_node.ports.first()
581 port.stream = self
582 port.save()
583 self.horizontally_center_graphic()
584 self.save()
585 self.save()
586 decision_node.save()
587 #handle compounds for decision node
588 update_decision_node_and_propagate(decision_node, updated_via_right_click=True)
589 return decision_node
592 def update_compounds(self, compounds: list[str]) -> None:
593 """
594 Updates the compounds for this stream
595 """
596 update_compounds_on_set(self, compounds)
599 def add_port(self, key: str, existing_stream: "SimulationObject | None" = None) -> Port:
600 """
601 Adds a port to this object and adds a a new stream if none is provided
602 """
603 from flowsheetInternals.unitops.models.simulation_object_factory import SimulationObjectFactory
604 # Create ports
605 ports_schema = self.schema.ports
607 # replace any many=True ports with multiple ports
608 port_dict = ports_schema[key]
609 # get the next index for the port (looks at all ports with the same key and gets the length of the l
610 next_index = self.ports.filter(key=key).count()
611 new_port = Port(
612 key=key,
613 index=next_index,
614 direction=port_dict.type,
615 displayName=port_dict.displayName + f" {next_index+1}",
616 unitOp=self,
617 flowsheet=self.flowsheet
618 )
619 new_port.save()
621 self.update_height()
624 if existing_stream:
625 #connect existing stream to the new port
626 new_port.stream = existing_stream
627 new_port.save() #might have to remove this trying to figure out why other streams are being removed
629 if self.objectType == "decisionNode": 629 ↛ 696line 629 didn't jump to line 696 because the condition on line 629 was always true
630 # Use existing function to update decision node compounds
631 update_decision_node_and_propagate(self)
632 else:
633 #create a new strea at the port using the factory
634 new_stream = SimulationObjectFactory.create_stream_at_port(new_port)
635 update_compounds_on_add_stream(new_port, new_stream)
636 outlet_name = self.schema.splitter_fraction_name
637 for property_key, property in self.schema.properties.items():
638 if property.indexSets is not None and ('splitter_fraction' in property.indexSets) and new_port.direction == ConType.Outlet: 638 ↛ 693line 638 didn't jump to line 693 because the condition on line 638 was always true
639 property_infos = self.properties.ContainedProperties.all()
640 property_info = property_infos.first()
641 new_indexed_item = IndexedItem.objects.create(
642 owner=self,
643 key="outlet_" + f"{next_index+1}",
644 displayName= outlet_name + f" {next_index+1}",
645 type="splitter_fraction",
646 flowsheet=self.flowsheet,
647 )
649 # get indexed set for split_fraction or priorities:
650 index_sets = property.indexSets
652 # figure out which other indexed items this should link to
653 indexed_item_sets : List[List[IndexedItem]]= []
654 for index in index_sets:
655 if index == IndexChoices.SplitterFraction:
656 continue # We don't need this, it's the one we're editing
657 indexed_items : List[IndexedItem] = self.get_indexed_items(index) # e.g get_indexed_items("phase"), "compound", etc
658 indexed_item_sets.append(indexed_items)
660 # create a property value for each combination of indexed items, for the new outlet
661 combinations = list(itertools.product(*indexed_item_sets))
663 # Bulk update existing property values to be enabled
664 property_status = False
665 if self.objectType == "header" or self.objectType == "simple_header": 665 ↛ 666line 665 didn't jump to line 666 because the condition on line 665 was never true
666 property_status = True
667 else:
668 property_info.values.update(enabled=True)
671 # bulk create indexed items (all disabled as it's the last outlet)
672 index_links_to_create : List[PropertyValueIntermediate] = []
673 for combination in combinations:
674 # Create a propertyValue for this combination
675 combo_property_value = PropertyValue.objects.create(property=property_info, enabled=property_status, flowsheet=self.flowsheet)
676 # link it up to the new outlet
677 index_links_to_create.append(
678 PropertyValueIntermediate(propertyvalue=combo_property_value, indexeditem=new_indexed_item)
679 )
681 #PropertyValueIntermediate.objects.create(propertyvalue=combo_property_value, indexeditem=new_indexed_item)
682 # Also link it up to all the other sets
683 for indexed_item in combination:
684 #PropertyValueIntermediate.objects.create(propertyvalue=combo_property_value, indexeditem=indexed_item)
686 index_links_to_create.append(
687 PropertyValueIntermediate(propertyvalue=combo_property_value, indexeditem=indexed_item)
688 )
690 # Bulk create the links
691 PropertyValueIntermediate.objects.bulk_create(index_links_to_create)
693 new_stream.save()
696 return new_port
698 def update_height(self):
699 """
700 Updates the height of the graphic object based on the number of ports
701 """
702 if self.schema.graphicObject.autoHeight: # e.g header has auto height calculation 702 ↛ 703line 702 didn't jump to line 703 because the condition on line 702 was never true
703 max_ports = max(self.ports.filter(direction=ConType.Inlet).count(), self.ports.filter(direction=ConType.Outlet).count())
704 self.graphicObject.update(height=max_ports * 200)
707 def get_indexed_items(self,index_set_type : IndexChoices) -> List[IndexedItem]:
708 match index_set_type:
709 case IndexChoices.Phase:
710 # get all the indexedItems that are type=phase
711 items = IndexedItem.objects.filter(owner=self, type=IndexChoices.Phase).all()
712 return items
713 case IndexChoices.Compound: 713 ↛ 716line 713 didn't jump to line 716 because the pattern on line 713 always matched
714 items = IndexedItem.objects.filter(owner=self, type=IndexChoices.Compound).all()
715 return items
716 case _:
717 raise ValueError("Get_indexed_items didn't expect this index set type")
721 def merge_decision_nodes(self, decision_node_active: "SimulationObject", decision_node_over: "SimulationObject") -> Optional["SimulationObject"]:
722 """
723 Merges this decision node with the over decision node and handles graphics positioning
724 """
725 import flowsheetInternals.unitops.models.delete_factory as DeleteFactory
727 # Get all streams from active node
728 inlet_streams = decision_node_active.ports.filter(direction="inlet").all()
729 outlet_streams = decision_node_active.ports.filter(direction="outlet").all()
731 # Transfer and reposition inlet streams
732 for inlet_port in inlet_streams:
733 stream = inlet_port.stream
734 # Add stream to new decision node
735 decision_node_over.add_port("inlet", stream)
737 # Update compounds and propagate
738 update_decision_node_and_propagate(decision_node_over)
740 # Transfer and reposition outlet streams
741 for outlet_port in outlet_streams:
742 stream = outlet_port.stream
743 # Add stream to new decision node
744 decision_node_over.add_port("outlet", stream)
746 # Delete the active node and its graphic object
747 DeleteFactory.delete_object(decision_node_active)
748 decision_node_over.save()
749 return decision_node_over
752 def reevaluate_properties_enabled(self) -> None:
753 """
754 Reevaluates property access for all properties in this object
756 This should only really be called when connections are changed,
757 otherwise the property enabling should be handled by adding
758 or removing control values.
759 """
760 if(self.objectType == SimulationObjectClass.MachineLearningBlock):
761 return # We don't want to change from the defaults
762 properties: list[PropertyInfo] = self.properties.ContainedProperties.all()
763 config = self.schema
764 config_properties = config.properties
765 config_groups = config.propertySetGroups
766 def _eval_enabled(prop: PropertyInfo, config_group) -> bool:
767 if self.is_stream():
768 # disable outlet/intermediate stream properties
769 ports = self.connectedPorts.all()
770 if len(ports) == 2 or (len(ports) == 1 and ports[0].direction == "outlet"):
771 return False
772 if config_group is None: 772 ↛ 773line 772 didn't jump to line 773 because the condition on line 772 was never true
773 return True # default to enabled, e.g for custom properties
774 if config_group.type == "stateVars":
775 state_vars = getattr(config_group, "stateVars") or ()
776 return prop.key in state_vars
777 return True # eg. All, default to enabled
779 list_prop_val = []
781 for prop in properties:
782 config_prop = config_properties.get(prop.key)
783 if config_prop: 783 ↛ 786line 783 didn't jump to line 786 because the condition on line 783 was always true
784 group = config_prop.propertySetGroup
785 else:
786 group = "default"
787 config_group = config_groups.get(group, None)
788 res = _eval_enabled(prop, config_group)
789 list_prop_val.extend(prop.enable(res))
791 PropertyValue.objects.bulk_update(list_prop_val, ["enabled"])
794 def get_unspecified_properties(self) -> list:
795 if not hasattr(self, "properties") or not self.properties: 795 ↛ 796line 795 didn't jump to line 796 because the condition on line 795 was never true
796 return []
798 contained_properties: models.QuerySet[PropertyInfo] = self.properties.ContainedProperties.all()
799 is_splitter = getattr(self.schema, "displayType", "").lower() == "splitter"
801 # use schema stateVars to find required properties
802 required_properties: Set[str] = set()
803 for group in self.schema.propertySetGroups.values():
804 if group.toggle and not self.properties.get_property(group.toggle).get_value(): 804 ↛ 806line 804 didn't jump to line 806 because the condition on line 804 was never true
805 # The group is toggled off, so we don't care that the properties are not specified
806 continue
807 if group.type == "stateVars" or group.type == "composition" or group.type == "exceptLast": 807 ↛ 803line 807 didn't jump to line 803 because the condition on line 807 was always true
808 required_properties.update(group.stateVars)
810 # use pre fetched data to avoid additional queries
811 unspecified_properties = []
812 property_info: PropertyInfo
813 for property_info in contained_properties:
814 # check if the property has no values or if all values are invalid
815 has_valid_value = property_info.isSpecified()
816 if not has_valid_value and property_info.key in required_properties:
817 unspecified_properties.append(property_info.key)
819 # check if mole_frac_comp sums to 1
820 mole_frac_props = [x for x in contained_properties if x.key=="mole_frac_comp"]
822 for prop in mole_frac_props:
823 if prop.has_value_bulk():
824 total = 0.0
825 for value in prop.values.all():
826 try:
827 val = float(value.value)
828 total += val
829 except (ValueError, TypeError):
830 continue
831 if abs(total - 1.0) > 0.001 and "mole_frac_comp" not in unspecified_properties: 831 ↛ 832line 831 didn't jump to line 832 because the condition on line 831 was never true
832 unspecified_properties.append("mole_frac_comp")
833 else:
834 if "mole_frac_comp" not in unspecified_properties:
835 unspecified_properties.append("mole_frac_comp")
837 return unspecified_properties
839 def delete(self, *args, **kwargs):
840 raise NotImplementedError("Use delete_object method from DeleteFactory")
842 def permanently_delete(self, *args, **kwargs):
843 """
844 Permanently deletes the object from the database.
845 This should only be used in tests or when you are sure you want to delete the object.
846 """
847 super().delete(*args, **kwargs)