Coverage for backend/flowsheetInternals/graphicData/logic/make_group.py: 86%

115 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-11-06 23:27 +0000

1from typing import List 

2from core.auxiliary.enums.unitOpData import SimulationObjectClass 

3from core.auxiliary.enums import ConType 

4from flowsheetInternals.graphicData.models.groupingModel import Breadcrumbs, Grouping, GraphicObject 

5from flowsheetInternals.unitops.models.SimulationObject import SimulationObject 

6import random 

7from typing import List 

8from core.auxiliary.models.ObjectTypeCounter import ObjectTypeCounter 

9 

10 

11def make_group(contained_objects_id: List[int]) -> Grouping: 

12 """ 

13 Creates a group 

14  

15 Args: 

16 containedObjects_id (List[SimulationObject id]): Simulation objects to include in the group. 

17 is_abstract: (bool): This hasn't really been implemented yet, but the idea was to have 

18 groups shown fully in the parent, with modules hidden inside something. 

19 """ 

20 

21 # Ensure all unit operations share the same group 

22 simulation_objects = SimulationObject.objects.filter(pk__in=contained_objects_id) 

23 unit_operations = simulation_objects.exclude(objectType=SimulationObjectClass.Stream).all() 

24 streams = simulation_objects.filter(objectType=SimulationObjectClass.Stream) 

25 group_ids = [g.id for g in simulation_objects.filter(objectType=SimulationObjectClass.Group)] 

26 unitop_ids = [uo.id for uo in unit_operations] 

27 

28 #Remove orphaned streams 

29 for stream in streams: 

30 stream_group_ids = [g.simulationObject.pk for g in stream.get_groups()] 

31 connected_unitop_ids = [p.unitOp.id for p in stream.connectedPorts.all()] 

32 #check for orphan and catch case where stream is between two packed groups 

33 if not bool(set(connected_unitop_ids).intersection(unitop_ids)) and not bool(set(stream_group_ids).intersection(group_ids)): 33 ↛ 34line 33 didn't jump to line 34 because the condition on line 33 was never true

34 contained_objects_id.remove(stream.id) 

35 

36 if not unit_operations.exists(): 36 ↛ 37line 36 didn't jump to line 37 because the condition on line 36 was never true

37 raise ValueError("No unit operations provided to determine group.") 

38 

39 # Get the common group from the unit operations 

40 parent_group = unit_operations.first().graphicObject.last().group 

41 if any(unitop.graphicObject.last().group != parent_group for unitop in unit_operations): 41 ↛ 42line 41 didn't jump to line 42 because the condition on line 41 was never true

42 raise ValueError("All unit operations must belong to the same group.") 

43 

44 # Include streams only if they belong to the same group 

45 for unitop in unit_operations: 

46 for port in unitop.ports.all(): 

47 if port.stream and port.stream.id not in contained_objects_id: 

48 group_ids = [g.pk for g in port.stream.get_groups()] 

49 if parent_group.pk in group_ids: 49 ↛ 46line 49 didn't jump to line 46 because the condition on line 49 was always true

50 contained_objects_id.append(port.stream.id) 

51 

52 # Re-filter simulation objects after appending streams 

53 simulation_objects = SimulationObject.objects.filter(pk__in=contained_objects_id) 

54 

55 flowsheet = simulation_objects.first().flowsheet 

56 

57 idx_for_type = ObjectTypeCounter.next_for(flowsheet, "module") 

58 componentName = f"Module {idx_for_type}" 

59 

60 # Create the new group 

61 simulation_object = simulation_objects.first() 

62 flowsheet = simulation_object.flowsheet 

63 new_group = Grouping.create(flowsheet, parent_group, componentName=componentName) 

64 new_group.update_internal_simulation_objects(simulation_objects) 

65 new_group.set_group_size() 

66 

67 inlet_streams = [] 

68 outlet_streams = [] 

69 

70 # handle streams for all simulation objects 

71 for simulation_object in simulation_objects: 

72 

73 # propagate streams connected to one port 

74 if simulation_object.is_stream() and simulation_object.connectedPorts.count() == 1: 

75 if simulation_object.connectedPorts.all()[0].direction == ConType.Inlet: 

76 inlet_streams.append(simulation_object) 

77 else: 

78 outlet_streams.append(simulation_object) 

79 

80 # handle intermediate streams connected to the module 

81 if simulation_object.is_stream() and simulation_object.connectedPorts.count() == 2: 

82 propagate_intermediate_streams(simulation_object, contained_objects_id) 

83 

84 propagate_streams(inlet_streams, ConType.Inlet) 

85 propagate_streams(outlet_streams, ConType.Outlet) 

86 

87 # update zone of stream data entries 

88 for unitop in unit_operations: 

89 for stream_data_entry in unitop.StreamDataEntries.all(): 

90 stream_data_entry.group = new_group 

91 stream_data_entry.save() 

92 

93 return new_group 

94 

95 

96def propagate_streams(streams: List[SimulationObject], direction: ConType): 

97 """ 

98 Propagates a stream's graphic objects to all parent groups of its connected unit operations. 

99 

100 Args: 

101 simulation_object (SimulationObject): The stream to propagate. 

102 """ 

103 if direction == ConType.Inlet: 

104 x_offset = -1 

105 else: 

106 x_offset = 2 

107 for simulation_object in streams: 

108 y_offset = (0.5 + streams.index(simulation_object)) / len(streams) 

109 if not simulation_object.is_stream(): 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true

110 pass 

111 

112 # make sure the stream is connected to one port (0 and 2 or more should not propagate) 

113 connected_ports = simulation_object.connectedPorts.all() 

114 if len(connected_ports) != 1: 114 ↛ 115line 114 didn't jump to line 115 because the condition on line 114 was never true

115 pass 

116 

117 # get the original graphic object for reference 

118 original_graphic_object = simulation_object.graphicObject.first() 

119 if not original_graphic_object: 119 ↛ 120line 119 didn't jump to line 120 because the condition on line 119 was never true

120 pass 

121 

122 # propagate to all parent groups of the connected unit operation 

123 unit_op = connected_ports[0].unitOp 

124 for parent_group in unit_op.get_parent_groups(): 

125 if not simulation_object.graphicObject.filter(group=parent_group).exists(): 

126 GraphicObject.objects.create( 

127 flowsheet=simulation_object.flowsheet, 

128 simulationObject=simulation_object, 

129 width=original_graphic_object.width, 

130 height=original_graphic_object.height, 

131 x=original_graphic_object.group.get_graphic_object().x -12.5 + x_offset*original_graphic_object.group.get_graphic_object().width, 

132 y=original_graphic_object.group.get_graphic_object().y -12.5 + y_offset*original_graphic_object.group.get_graphic_object().height, 

133 group=parent_group, 

134 ) 

135 

136 

137def propagate_intermediate_streams(simulation_object: SimulationObject, contained_objects_id: List[int]): 

138 """ 

139 Handles the special case where an intermediate stream is connected to a module. 

140 

141 Args: 

142 simulation_object (SimulationObject): The intermediate stream to propagate. 

143 contained_objects_id (List[int]): The IDs of objects being abstracted. 

144 """ 

145 if not simulation_object.is_stream(): 145 ↛ 146line 145 didn't jump to line 146 because the condition on line 145 was never true

146 return 

147 

148 # make sure the stream is an intermediate stream (connected to exactly two ports) 

149 connected_ports = simulation_object.connectedPorts.all() 

150 if len(connected_ports) != 2: 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true

151 return 

152 

153 # Count the number of connections to contained objects (unitops or modules) 

154 connections_to_contained_objects = 0 

155 for port in connected_ports: 

156 # Check if the connected unitop is in the contained objects 

157 if port.unitOp.id in contained_objects_id: 

158 connections_to_contained_objects += 1 

159 

160 # get the original graphic object for reference 

161 original_graphic_object = simulation_object.graphicObject.last() 

162 if not original_graphic_object: 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true

163 return 

164 

165 

166 def groups_to_add_intermediate(): 

167 """ 

168 Check if both ends of the intermediate stream are visible in the given group. 

169 If they are, return the groups to which the intermediate stream should be added. 

170 """ 

171 port1, port2 = connected_ports 

172 

173 def get_group_path(group): 

174 path = [] 

175 while group: 

176 path.append(group) 

177 group = group.get_parent_group() 

178 return path # [leaf, ..., root] 

179 

180 path1 = get_group_path(port1.unitOp.get_group()) 

181 path2 = get_group_path(port2.unitOp.get_group()) 

182 

183 # The LCA is the first group in path1 that is also in path2 

184 lca = next((group for group in path1 if group in path2), None) 

185 if lca is None: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true

186 return [] 

187 

188 # Collect all groups from path1 up to and including the LCA 

189 groups = [] 

190 for group in path1: 190 ↛ 195line 190 didn't jump to line 195 because the loop on line 190 didn't complete

191 groups.append(group) 

192 if group == lca: 

193 break 

194 # Collect all groups from path2 up to but not including the LCA (avoid duplicates) 

195 for group in path2: 195 ↛ 200line 195 didn't jump to line 200 because the loop on line 195 didn't complete

196 if group == lca: 

197 break 

198 if group not in groups: 198 ↛ 195line 198 didn't jump to line 195 because the condition on line 198 was always true

199 groups.append(group) 

200 return groups 

201 

202 

203 # we can use the groups_to_add_intermediate function to determine which groups to add the stream to 

204 # we get the groups to put the intermediate streams so just add it to those groups 

205 simulation_object.graphicObject.all().delete() 

206 groups_to_add = groups_to_add_intermediate() 

207 for group in groups_to_add: 

208 if not simulation_object.graphicObject.filter(group=group).exists(): 208 ↛ 207line 208 didn't jump to line 207 because the condition on line 208 was always true

209 GraphicObject.objects.create( 

210 flowsheet=simulation_object.flowsheet, 

211 simulationObject=simulation_object, 

212 width=original_graphic_object.width, 

213 height=original_graphic_object.height, 

214 x=original_graphic_object.x, 

215 y=original_graphic_object.y, 

216 group=group 

217 ) 

218