Coverage for backend/django/flowsheetInternals/graphicData/logic/make_group.py: 88%

140 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-12-18 04:00 +0000

1from typing import List 

2from core.auxiliary.enums.unitOpData import SimulationObjectClass 

3from core.auxiliary.enums import ConType 

4from flowsheetInternals.graphicData.models.groupingModel import Breadcrumbs, Grouping, GraphicObject 

5from flowsheetInternals.unitops.models.SimulationObject import SimulationObject 

6import random 

7from typing import List 

8from core.auxiliary.models.ObjectTypeCounter import ObjectTypeCounter 

9from django.db.models import Count, QuerySet, Prefetch 

10from flowsheetInternals.unitops.models.Port import Port 

11 

12 

13def make_group(contained_objects_id: List[int]) -> Grouping: 

14 """ 

15 Creates a group 

16  

17 Args: 

18 containedObjects_id (List[SimulationObject id]): Simulation objects to include in the group. 

19 is_abstract: (bool): This hasn't really been implemented yet, but the idea was to have 

20 groups shown fully in the parent, with modules hidden inside something. 

21 """ 

22 

23 # Ensure all unit operations share the same group 

24 simulation_objects = SimulationObject.objects.filter(pk__in=contained_objects_id).prefetch_related( 

25 Prefetch( 

26 "connectedPorts", 

27 queryset=Port.objects.select_related("unitOp").prefetch_related() 

28 ) 

29 ) 

30 unit_operations = ( 

31 simulation_objects.exclude(objectType=SimulationObjectClass.Stream) 

32 .exclude(objectType=SimulationObjectClass.Recycle) # exclude recycle block because we want to propagate them like streams 

33 .all() 

34 ) 

35 streams = simulation_objects.filter(objectType=SimulationObjectClass.Stream) 

36 group_ids = [g.id for g in simulation_objects.filter(objectType=SimulationObjectClass.Group)] 

37 unitop_ids = [uo.id for uo in unit_operations] 

38 

39 #Remove orphaned streams 

40 for stream in streams: 

41 stream_group_ids = [g.simulationObject.pk for g in stream.get_groups()] 

42 connected_unitop_ids = [p.unitOp.id for p in stream.connectedPorts.all()] 

43 #check for orphan and catch case where stream is between two packed groups 

44 if not bool(set(connected_unitop_ids).intersection(unitop_ids)) and not bool(set(stream_group_ids).intersection(group_ids)): 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true

45 contained_objects_id.remove(stream.id) 

46 

47 if not unit_operations.exists(): 47 ↛ 48line 47 didn't jump to line 48 because the condition on line 47 was never true

48 raise ValueError("No unit operations provided to determine group.") 

49 

50 # Get the common group from the unit operations 

51 parent_group: Grouping = unit_operations.first().graphicObject.last().group 

52 if any(unitop.graphicObject.last().group != parent_group for unitop in unit_operations): 52 ↛ 53line 52 didn't jump to line 53 because the condition on line 52 was never true

53 raise ValueError("All unit operations must belong to the same group.") 

54 

55 # Get all connections in current group within the contained objects 

56 simulation_object_ids = [obj.pk for obj in simulation_objects] 

57 selected_connections = [connection for connection in parent_group.get_connections() if 

58 connection.unitOp in simulation_object_ids] 

59 

60 # Find intermediate streams which were not part of the initial selection (hanging streams) 

61 no_duplicate_list = set() 

62 hanging_streams = [connection.stream for connection in selected_connections if 

63 connection.stream in no_duplicate_list or no_duplicate_list.add(connection.stream)] 

64 # Add hanging streams to contained objects if not already included 

65 for stream in hanging_streams: 

66 if stream not in contained_objects_id: 

67 contained_objects_id.append(stream) 

68 

69 # Include streams only if they belong to the same group 

70 for unitop in unit_operations: 

71 for port in unitop.ports.all(): 

72 if port.stream and port.stream.id not in contained_objects_id: 

73 group_ids = [g.pk for g in port.stream.get_groups()] 

74 if parent_group.pk in group_ids: 74 ↛ 71line 74 didn't jump to line 71 because the condition on line 74 was always true

75 contained_objects_id.append(port.stream.id) 

76 

77 # Get inlet and outlet streams connected to groups 

78 in_out_streams : list[int] = SimulationObject.objects.annotate(port_count=Count('connectedPorts')).filter(pk__in=no_duplicate_list, port_count=1).values_list('pk', flat=True) 

79 

80 # Add inlet and outlet streams to contained objects if not already included 

81 for stream in in_out_streams: 

82 if stream not in contained_objects_id: 

83 contained_objects_id.append(stream) 

84 

85 # Re-filter simulation objects after appending streams 

86 simulation_objects = SimulationObject.objects.filter(pk__in=contained_objects_id) 

87 

88 flowsheet = simulation_objects.first().flowsheet 

89 

90 idx_for_type = ObjectTypeCounter.next_for(flowsheet, "module") 

91 componentName = f"Module {idx_for_type}" 

92 

93 # Create the new group 

94 simulation_object = simulation_objects.first() 

95 flowsheet = simulation_object.flowsheet 

96 new_group = Grouping.create(flowsheet, parent_group, componentName=componentName) 

97 new_group.update_internal_simulation_objects(simulation_objects) 

98 new_group.set_group_size() 

99 

100 inlet_streams = [] 

101 outlet_streams = [] 

102 

103 # handle streams for all simulation objects 

104 for simulation_object in simulation_objects: 

105 

106 # propagate streams connected to one port 

107 if simulation_object.is_stream() and simulation_object.connectedPorts.count() == 1: 

108 if simulation_object.connectedPorts.all()[0].direction == ConType.Inlet: 

109 inlet_streams.append(simulation_object) 

110 else: 

111 outlet_streams.append(simulation_object) 

112 

113 # handle intermediate streams connected to the module 

114 if simulation_object.is_stream() and simulation_object.connectedPorts.count() == 2: 

115 propagate_intermediate_streams(simulation_object, contained_objects_id) 

116 # if simulation_object.recycleData.exists(): 

117 # print("SimObj", simulation_object.recycleData) 

118 

119 propagate_streams(inlet_streams, ConType.Inlet) 

120 propagate_streams(outlet_streams, ConType.Outlet) 

121 

122 # update zone of stream data entries 

123 for unitop in unit_operations: 

124 for stream_data_entry in unitop.StreamDataEntries.all(): 

125 stream_data_entry.group = new_group 

126 stream_data_entry.save() 

127 

128 return new_group 

129 

130 

131def propagate_streams(streams: List[SimulationObject], direction: ConType): 

132 """ 

133 Propagates a stream's graphic objects to all parent groups of its connected unit operations. 

134 

135 Args: 

136 simulation_object (SimulationObject): The stream to propagate. 

137 """ 

138 if direction == ConType.Inlet: 

139 x_offset = -1 

140 else: 

141 x_offset = 2 

142 for simulation_object in streams: 

143 y_offset = (0.5 + streams.index(simulation_object)) / len(streams) 

144 if not simulation_object.is_stream(): 144 ↛ 145line 144 didn't jump to line 145 because the condition on line 144 was never true

145 pass 

146 

147 # make sure the stream is connected to one port (0 and 2 or more should not propagate) 

148 connected_ports: QuerySet[Port]= simulation_object.connectedPorts.all() 

149 if len(connected_ports) != 1: 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true

150 pass 

151 

152 # get the original graphic object for reference 

153 original_stream_graphic_object : GraphicObject = simulation_object.graphicObject.first() 

154 if not original_stream_graphic_object: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true

155 pass 

156 

157 # propagate to all parent groups of the connected unit operation 

158 unit_op : SimulationObject = connected_ports[0].unitOp 

159 for parent_group in unit_op.get_parent_groups(): 

160 if not simulation_object.graphicObject.filter(group=parent_group).exists(): 

161 

162 module_graphic_object = original_stream_graphic_object.group.get_graphic_object() 

163 

164 GraphicObject.objects.create( 

165 flowsheet=simulation_object.flowsheet, 

166 simulationObject=simulation_object, 

167 width=original_stream_graphic_object.width, 

168 height=original_stream_graphic_object.height, 

169 x=module_graphic_object.x -12.5 + x_offset*module_graphic_object.width, 

170 y=module_graphic_object.y -12.5 + y_offset*module_graphic_object.height, 

171 group=parent_group, 

172 ) 

173 

174 

175def propagate_intermediate_streams(simulation_object: SimulationObject, contained_objects_id: List[int]): 

176 """ 

177 Handles the special case where an intermediate stream is connected to a module. 

178 

179 Args: 

180 simulation_object (SimulationObject): The intermediate stream to propagate. 

181 contained_objects_id (List[int]): The IDs of objects being abstracted. 

182 """ 

183 if not simulation_object.is_stream(): 183 ↛ 184line 183 didn't jump to line 184 because the condition on line 183 was never true

184 return 

185 

186 # make sure the stream is an intermediate stream (connected to exactly two ports) 

187 connected_ports = simulation_object.connectedPorts.all() 

188 if len(connected_ports) != 2: 188 ↛ 189line 188 didn't jump to line 189 because the condition on line 188 was never true

189 return 

190 

191 # Count the number of connections to contained objects (unitops or modules) 

192 connections_to_contained_objects = 0 

193 for port in connected_ports: 

194 # Check if the connected unitop is in the contained objects 

195 if port.unitOp.id in contained_objects_id: 

196 connections_to_contained_objects += 1 

197 

198 # get the original graphic object for reference 

199 original_graphic_object = simulation_object.graphicObject.last() 

200 if not original_graphic_object: 200 ↛ 201line 200 didn't jump to line 201 because the condition on line 200 was never true

201 return 

202 

203 

204 def groups_to_add_intermediate(): 

205 """ 

206 Check if both ends of the intermediate stream are visible in the given group. 

207 If they are, return the groups to which the intermediate stream should be added. 

208 """ 

209 port1, port2 = connected_ports 

210 

211 def get_group_path(group): 

212 path = [] 

213 while group: 

214 path.append(group) 

215 group = group.get_parent_group() 

216 return path # [leaf, ..., root] 

217 

218 path1 = get_group_path(port1.unitOp.get_group()) 

219 path2 = get_group_path(port2.unitOp.get_group()) 

220 

221 # The LCA is the first group in path1 that is also in path2 

222 lca = next((group for group in path1 if group in path2), None) 

223 if lca is None: 223 ↛ 224line 223 didn't jump to line 224 because the condition on line 223 was never true

224 return [] 

225 

226 # Collect all groups from path1 up to and including the LCA 

227 groups = [] 

228 for group in path1: 228 ↛ 233line 228 didn't jump to line 233 because the loop on line 228 didn't complete

229 groups.append(group) 

230 if group == lca: 

231 break 

232 # Collect all groups from path2 up to but not including the LCA (avoid duplicates) 

233 for group in path2: 233 ↛ 238line 233 didn't jump to line 238 because the loop on line 233 didn't complete

234 if group == lca: 

235 break 

236 if group not in groups: 236 ↛ 233line 236 didn't jump to line 233 because the condition on line 236 was always true

237 groups.append(group) 

238 return groups 

239 

240 

241 # we can use the groups_to_add_intermediate function to determine which groups to add the stream to 

242 # we get the groups to put the intermediate streams so just add it to those groups 

243 simulation_object.graphicObject.all().delete() 

244 groups_to_add = groups_to_add_intermediate() 

245 for group in groups_to_add: 

246 if not simulation_object.graphicObject.filter(group=group).exists(): 246 ↛ 245line 246 didn't jump to line 245 because the condition on line 246 was always true

247 GraphicObject.objects.create( 

248 flowsheet=simulation_object.flowsheet, 

249 simulationObject=simulation_object, 

250 width=original_graphic_object.width, 

251 height=original_graphic_object.height, 

252 x=original_graphic_object.x, 

253 y=original_graphic_object.y, 

254 group=group 

255 ) 

256 

257 # Creates recycle graphic objects in this group iteration if stream has a recycle connection 

258 if simulation_object.has_recycle_connection: 

259 # Get the recycle associated with this stream 

260 recycle = simulation_object.recycleConnection 

261 

262 # Prevent duplicating the recycle graphic object in the same group 

263 if recycle.simulationObject.graphicObject.filter(group=group).exists(): 

264 continue 

265 

266 # Set graphic objects for the recycle block in the same groups as the stream 

267 default_graphic = recycle.simulationObject.graphicObject.last() 

268 default_width = default_graphic.width if default_graphic else 32 

269 default_height = default_graphic.height if default_graphic else 32\ 

270 

271 # Make sure a stream graphic exists in this group 

272 stream_graphic = simulation_object.graphicObject.filter(group=group).last() 

273 if stream_graphic is None: 273 ↛ 274line 273 didn't jump to line 274 because the condition on line 273 was never true

274 continue 

275 

276 # Recreate the recycle graphic object 

277 GraphicObject.objects.create( 

278 flowsheet=simulation_object.flowsheet, 

279 simulationObject=recycle.simulationObject, 

280 width=default_width, 

281 height=default_height, 

282 x=stream_graphic.x + (stream_graphic.width - default_width) / 2, 

283 y=stream_graphic.y + stream_graphic.height + 30, 

284 group=group, 

285 ) 

286