Coverage for backend/flowsheetInternals/unitops/models/Port.py: 88%
67 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
1from django.db import models
2from core.auxiliary.enums import ConType
3from flowsheetInternals.graphicData.models.graphicObjectModel import GraphicObject
4from common.config_types import *
5from flowsheetInternals.unitops.config.config_methods import *
6from flowsheetInternals.unitops.config.config_base import configuration
8from core.managers import AccessControlManager
9from core.auxiliary.models.PropertyValue import PropertyValue, PropertyValueIntermediate
10from core.auxiliary.models.IndexedItem import IndexedItem
13class Port(models.Model):
14 flowsheet = models.ForeignKey("core_auxiliary.Flowsheet", on_delete=models.CASCADE, related_name="Ports")
15 displayName = models.CharField(max_length=64)
16 direction = models.CharField(choices=ConType.choices)
17 key = models.CharField(max_length=64) # key to identify this port in the schema
18 index = models.IntegerField(default=0) #index of the port
19 unitOp = models.ForeignKey('SimulationObject', on_delete=models.CASCADE, related_name="ports", null=True)
20 stream = models.ForeignKey('SimulationObject', on_delete=models.SET_NULL, null=True, related_name="connectedPorts")
22 created_at = models.DateTimeField(auto_now_add=True)
23 objects = AccessControlManager()
28 class Meta:
29 ordering = ['index', 'created_at']
31 def default_stream_position(self, unitop: "flowsheetInternals.unitops.SimulationObject", unitop_graphic_object: GraphicObject, port_index: int, num_ports: int) -> dict[str, float]:
32 """Calculate the default position for a stream connected to this port"""
33 stream_offset = get_object_schema(unitop).ports[self.key].streamOffset
35 if self.direction == ConType.Inlet:
36 xOffset = -stream_offset
37 else:
38 xOffset = 1 + stream_offset
40 # Calculate y offset based on port index and total number of ports
41 yOffset = (port_index + 0.5) / num_ports
43 return {
44 "x": unitop_graphic_object.x + xOffset * unitop_graphic_object.width,
45 "y": unitop_graphic_object.y + yOffset * unitop_graphic_object.height
46 }
49 def default_stream_name(self, unit_op: "flowsheetInternals.unitops.SimulationObject") -> str:
50 return get_object_schema(unit_op).ports[self.key].streamName
52 def reindex_port_on_delete(self):
53 subsequent_ports = Port.objects.filter(
54 unitOp=self.unitOp,
55 key=self.key,
56 index__gt=self.index
57 ).order_by('index')
59 unit_op_schema = get_object_schema(self.unitOp)
60 display_name = unit_op_schema.ports[self.key].displayName
61 index_before_update = self.index
62 for p in subsequent_ports:
63 p.index -= 1
64 p.displayName = f"{display_name} {p.index + 1}"
65 p.save()
67 property_set = self.unitOp.properties
68 property_infos = property_set.containedProperties.filter(
69 index__gte=self.index
70 ).order_by('index')
72 unit = self.unitOp
76 value = False
77 if self.direction == ConType.Outlet:
78 # You can only delete outlets on something that has a split fraction.
79 # Therefore, delete the split fraction property associated with the outlet.
80 # Get the properties of the outlet to be deleted.
81 property_infos = unit.properties.ContainedProperties.all()
82 for property in property_infos:
83 # TODO: Surely we can use sumToOne or indexed by compounds instead of this?
84 if property.key=="split_fraction": 84 ↛ 86line 84 didn't jump to line 86 because the condition on line 84 was always true
85 property_info = property_infos.filter(key="split_fraction").first()
86 elif property.key=="priorities":
87 property_info = property_infos.filter(key="priorities").first()
88 elif property.key=="split_flow":
89 property_info = property_infos.filter(key="split_flow").first()
91 # Get the index of the outlet to be deleted.
92 indexed_item = IndexedItem.objects.filter(owner=unit, key="outlet_" + f"{index_before_update+1}").first()
93 # Delete the outlet and its property values
94 PropertyValue.objects.filter(property=property_info, indexedItems=indexed_item).delete()
95 if indexed_item is not None: 95 ↛ 99line 95 didn't jump to line 99 because the condition on line 95 was always true
96 indexed_item.delete()
98 # Re-index the display names of the input fields for the remaining outlets.
99 indexed_items = IndexedItem.objects.filter(owner=unit,type="splitter_fraction")
100 # For the remaining outlets,
101 num_outlets = indexed_items.count()
102 for i in range(indexed_items.count()):
103 # Update the index
104 indexed_item = indexed_items[i]
105 indexed_item.key = "outlet_" + f"{i+1}"
106 #Use the updated index to change the name displayed on the frontend
107 indexed_item.displayName = unit.schema.splitter_fraction_name + f" {i+1}"
108 indexed_item.save()
110 self.delete()
111 unit.refresh_from_db()
112 unit.reevaluate_properties_enabled()
113 else:
114 self.delete()