Coverage for backend/flowsheetInternals/unitops/models/Port.py: 88%

67 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-11-06 23:27 +0000

1from django.db import models 

2from core.auxiliary.enums import ConType 

3from flowsheetInternals.graphicData.models.graphicObjectModel import GraphicObject 

4from common.config_types import * 

5from flowsheetInternals.unitops.config.config_methods import * 

6from flowsheetInternals.unitops.config.config_base import configuration 

7 

8from core.managers import AccessControlManager 

9from core.auxiliary.models.PropertyValue import PropertyValue, PropertyValueIntermediate 

10from core.auxiliary.models.IndexedItem import IndexedItem 

11 

12 

13class Port(models.Model): 

14 flowsheet = models.ForeignKey("core_auxiliary.Flowsheet", on_delete=models.CASCADE, related_name="Ports") 

15 displayName = models.CharField(max_length=64) 

16 direction = models.CharField(choices=ConType.choices) 

17 key = models.CharField(max_length=64) # key to identify this port in the schema 

18 index = models.IntegerField(default=0) #index of the port 

19 unitOp = models.ForeignKey('SimulationObject', on_delete=models.CASCADE, related_name="ports", null=True) 

20 stream = models.ForeignKey('SimulationObject', on_delete=models.SET_NULL, null=True, related_name="connectedPorts") 

21 

22 created_at = models.DateTimeField(auto_now_add=True) 

23 objects = AccessControlManager() 

24 

25 

26 

27 

28 class Meta: 

29 ordering = ['index', 'created_at'] 

30 

31 def default_stream_position(self, unitop: "flowsheetInternals.unitops.SimulationObject", unitop_graphic_object: GraphicObject, port_index: int, num_ports: int) -> dict[str, float]: 

32 """Calculate the default position for a stream connected to this port""" 

33 stream_offset = get_object_schema(unitop).ports[self.key].streamOffset 

34 

35 if self.direction == ConType.Inlet: 

36 xOffset = -stream_offset 

37 else: 

38 xOffset = 1 + stream_offset 

39 

40 # Calculate y offset based on port index and total number of ports 

41 yOffset = (port_index + 0.5) / num_ports 

42 

43 return { 

44 "x": unitop_graphic_object.x + xOffset * unitop_graphic_object.width, 

45 "y": unitop_graphic_object.y + yOffset * unitop_graphic_object.height 

46 } 

47 

48 

49 def default_stream_name(self, unit_op: "flowsheetInternals.unitops.SimulationObject") -> str: 

50 return get_object_schema(unit_op).ports[self.key].streamName 

51 

52 def reindex_port_on_delete(self): 

53 subsequent_ports = Port.objects.filter( 

54 unitOp=self.unitOp, 

55 key=self.key, 

56 index__gt=self.index 

57 ).order_by('index') 

58 

59 unit_op_schema = get_object_schema(self.unitOp) 

60 display_name = unit_op_schema.ports[self.key].displayName 

61 index_before_update = self.index 

62 for p in subsequent_ports: 

63 p.index -= 1 

64 p.displayName = f"{display_name} {p.index + 1}" 

65 p.save() 

66 

67 property_set = self.unitOp.properties 

68 property_infos = property_set.containedProperties.filter( 

69 index__gte=self.index 

70 ).order_by('index') 

71 

72 unit = self.unitOp 

73 

74 

75 

76 value = False 

77 if self.direction == ConType.Outlet: 

78 # You can only delete outlets on something that has a split fraction. 

79 # Therefore, delete the split fraction property associated with the outlet.  

80 # Get the properties of the outlet to be deleted. 

81 property_infos = unit.properties.ContainedProperties.all() 

82 for property in property_infos: 

83 # TODO: Surely we can use sumToOne or indexed by compounds instead of this? 

84 if property.key=="split_fraction": 84 ↛ 86line 84 didn't jump to line 86 because the condition on line 84 was always true

85 property_info = property_infos.filter(key="split_fraction").first() 

86 elif property.key=="priorities": 

87 property_info = property_infos.filter(key="priorities").first() 

88 elif property.key=="split_flow": 

89 property_info = property_infos.filter(key="split_flow").first() 

90 

91 # Get the index of the outlet to be deleted. 

92 indexed_item = IndexedItem.objects.filter(owner=unit, key="outlet_" + f"{index_before_update+1}").first() 

93 # Delete the outlet and its property values 

94 PropertyValue.objects.filter(property=property_info, indexedItems=indexed_item).delete() 

95 if indexed_item is not None: 95 ↛ 99line 95 didn't jump to line 99 because the condition on line 95 was always true

96 indexed_item.delete() 

97 

98 # Re-index the display names of the input fields for the remaining outlets. 

99 indexed_items = IndexedItem.objects.filter(owner=unit,type="splitter_fraction") 

100 # For the remaining outlets, 

101 num_outlets = indexed_items.count() 

102 for i in range(indexed_items.count()): 

103 # Update the index 

104 indexed_item = indexed_items[i] 

105 indexed_item.key = "outlet_" + f"{i+1}" 

106 #Use the updated index to change the name displayed on the frontend 

107 indexed_item.displayName = unit.schema.splitter_fraction_name + f" {i+1}" 

108 indexed_item.save() 

109 

110 self.delete() 

111 unit.refresh_from_db() 

112 unit.reevaluate_properties_enabled() 

113 else: 

114 self.delete() 

115 

116 

117