Coverage for backend/django/flowsheetInternals/graphicData/logic/make_group.py: 88%

144 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-03-26 20:57 +0000

1from typing import List 

2from core.auxiliary.enums.unitOpData import SimulationObjectClass 

3from core.auxiliary.enums import ConType 

4from flowsheetInternals.graphicData.models.groupingModel import Breadcrumbs, Grouping, GraphicObject 

5from flowsheetInternals.unitops.models.SimulationObject import SimulationObject 

6import random 

7from typing import List 

8from core.auxiliary.models.ObjectTypeCounter import ObjectTypeCounter 

9from django.db.models import Count, QuerySet, Prefetch 

10from flowsheetInternals.unitops.models.Port import Port 

11 

12 

13def make_group(contained_objects_id: List[int]) -> Grouping: 

14 """ 

15 Creates a group 

16  

17 Args: 

18 containedObjects_id (List[SimulationObject id]): Simulation objects to include in the group. 

19 is_abstract: (bool): This hasn't really been implemented yet, but the idea was to have 

20 groups shown fully in the parent, with modules hidden inside something. 

21 """ 

22 

23 # Ensure all unit operations share the same group 

24 simulation_objects = SimulationObject.objects.filter(pk__in=contained_objects_id).prefetch_related( 

25 Prefetch( 

26 "connectedPorts", 

27 queryset=Port.objects.select_related("unitOp").prefetch_related() 

28 ) 

29 ) 

30 unit_operations = ( 

31 simulation_objects.exclude(objectType=SimulationObjectClass.Stream) 

32 .exclude(objectType=SimulationObjectClass.Recycle) # exclude recycle block because we want to propagate them like streams 

33 .all() 

34 ) 

35 streams = simulation_objects.filter(objectType=SimulationObjectClass.Stream) 

36 group_ids = [g.id for g in simulation_objects.filter(objectType=SimulationObjectClass.Group)] 

37 unitop_ids = [uo.id for uo in unit_operations] 

38 

39 #Remove orphaned streams 

40 for stream in streams: 

41 stream_group_ids = [g.simulationObject.pk for g in stream.get_groups()] 

42 connected_unitop_ids = [p.unitOp.id for p in stream.connectedPorts.all()] 

43 #check for orphan and catch case where stream is between two packed groups 

44 if not bool(set(connected_unitop_ids).intersection(unitop_ids)) and not bool(set(stream_group_ids).intersection(group_ids)): 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true

45 contained_objects_id.remove(stream.id) 

46 

47 if not unit_operations.exists(): 47 ↛ 48line 47 didn't jump to line 48 because the condition on line 47 was never true

48 raise ValueError("No unit operations provided to determine group.") 

49 

50 # Get the common group from the unit operations 

51 parent_group: Grouping = unit_operations.first().graphicObject.last().group 

52 if any(unitop.graphicObject.last().group != parent_group for unitop in unit_operations): 52 ↛ 53line 52 didn't jump to line 53 because the condition on line 52 was never true

53 raise ValueError("All unit operations must belong to the same group.") 

54 

55 # Get all connections in current group within the contained objects 

56 simulation_object_ids = [obj.pk for obj in simulation_objects] 

57 selected_connections = [connection for connection in parent_group.get_connections() if 

58 connection.unitOp in simulation_object_ids] 

59 

60 # Find intermediate streams which were not part of the initial selection (hanging streams) 

61 no_duplicate_list = set() 

62 hanging_streams = [connection.stream for connection in selected_connections if 

63 connection.stream in no_duplicate_list or no_duplicate_list.add(connection.stream)] 

64 # Add hanging streams to contained objects if not already included 

65 for stream in hanging_streams: 

66 if stream not in contained_objects_id: 

67 contained_objects_id.append(stream) 

68 

69 # Include streams only if they belong to the same group 

70 for unitop in unit_operations: 

71 for port in unitop.ports.all(): 

72 if port.stream and port.stream.id not in contained_objects_id: 

73 group_ids = [g.pk for g in port.stream.get_groups()] 

74 if parent_group.pk in group_ids: 74 ↛ 71line 74 didn't jump to line 71 because the condition on line 74 was always true

75 contained_objects_id.append(port.stream.id) 

76 

77 # Get inlet and outlet streams connected to groups 

78 in_out_streams : list[int] = SimulationObject.objects.annotate(port_count=Count('connectedPorts')).filter(pk__in=no_duplicate_list, port_count=1).values_list('pk', flat=True) 

79 

80 # Add inlet and outlet streams to contained objects if not already included 

81 for stream in in_out_streams: 

82 if stream not in contained_objects_id: 

83 contained_objects_id.append(stream) 

84 

85 # Re-filter simulation objects after appending streams 

86 simulation_objects = SimulationObject.objects.filter(pk__in=contained_objects_id) 

87 

88 flowsheet = simulation_objects.first().flowsheet 

89 

90 idx_for_type = ObjectTypeCounter.next_for(flowsheet, "module") 

91 componentName = f"Module {idx_for_type}" 

92 

93 # Create the new group 

94 simulation_object = simulation_objects.first() 

95 flowsheet = simulation_object.flowsheet 

96 new_group = Grouping.create(flowsheet, parent_group, componentName=componentName) 

97 new_group.update_internal_simulation_objects(simulation_objects) 

98 new_group.set_group_size() 

99 

100 inlet_streams = [] 

101 outlet_streams = [] 

102 

103 # handle streams for all simulation objects 

104 for simulation_object in simulation_objects: 

105 

106 # propagate streams connected to one port 

107 if simulation_object.is_stream() and simulation_object.connectedPorts.count() == 1: 

108 if simulation_object.connectedPorts.all()[0].direction == ConType.Inlet: 

109 inlet_streams.append(simulation_object) 

110 else: 

111 outlet_streams.append(simulation_object) 

112 

113 # handle intermediate streams connected to the module 

114 if simulation_object.is_stream() and simulation_object.connectedPorts.count() == 2: 

115 propagate_intermediate_streams(simulation_object, contained_objects_id) 

116 

117 propagate_streams(inlet_streams, ConType.Inlet) 

118 propagate_streams(outlet_streams, ConType.Outlet) 

119 

120 # update zone of stream data entries 

121 for unitop in unit_operations: 

122 for stream_data_entry in unitop.StreamDataEntries.all(): 

123 stream_data_entry.group = new_group 

124 stream_data_entry.save() 

125 

126 return new_group 

127 

128 

129def propagate_streams(streams: List[SimulationObject], direction: ConType): 

130 """ 

131 Propagates a stream's graphic objects to all parent groups of its connected unit operations. 

132 

133 Args: 

134 simulation_object (SimulationObject): The stream to propagate. 

135 """ 

136 

137 if direction == ConType.Inlet: 

138 x_offset = -1 

139 else: 

140 x_offset = 2 

141 for simulation_object in streams: 

142 if(len(streams) <= 2): 

143 y_offset = (0.5 + streams.index(simulation_object)) / len(streams) 

144 y_gap = -12.5 

145 # space out the streams if there are too many of them 

146 else: 

147 y_offset = 0.5 * streams.index(simulation_object) 

148 y_gap = -12.5 * (2 * len(streams) - 5) 

149 

150 if not simulation_object.is_stream(): 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true

151 pass 

152 

153 # make sure the stream is connected to one port (0 and 2 or more should not propagate) 

154 connected_ports: QuerySet[Port]= simulation_object.connectedPorts.all() 

155 if len(connected_ports) != 1: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true

156 pass 

157 

158 # get the original graphic object for reference 

159 original_stream_graphic_object : GraphicObject = simulation_object.graphicObject.first() 

160 if not original_stream_graphic_object: 160 ↛ 161line 160 didn't jump to line 161 because the condition on line 160 was never true

161 pass 

162 

163 # propagate to all parent groups of the connected unit operation 

164 unit_op : SimulationObject = connected_ports[0].unitOp 

165 for parent_group in unit_op.get_parent_groups(): 

166 if not simulation_object.graphicObject.filter(group=parent_group).exists(): 

167 

168 module_graphic_object = original_stream_graphic_object.group.get_graphic_object() 

169 

170 GraphicObject.objects.create( 

171 flowsheet=simulation_object.flowsheet, 

172 simulationObject=simulation_object, 

173 width=original_stream_graphic_object.width, 

174 height=original_stream_graphic_object.height, 

175 x=module_graphic_object.x -12.5 + x_offset*module_graphic_object.width, 

176 y=module_graphic_object.y + y_gap + y_offset*module_graphic_object.height, 

177 group=parent_group, 

178 ) 

179 

180 

181def propagate_intermediate_streams(simulation_object: SimulationObject, contained_objects_id: List[int]): 

182 """ 

183 Handles the special case where an intermediate stream is connected to a module. 

184 

185 Args: 

186 simulation_object (SimulationObject): The intermediate stream to propagate. 

187 contained_objects_id (List[int]): The IDs of objects being abstracted. 

188 """ 

189 if not simulation_object.is_stream(): 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true

190 return 

191 

192 # make sure the stream is an intermediate stream (connected to exactly two ports) 

193 connected_ports = simulation_object.connectedPorts.all() 

194 if len(connected_ports) != 2: 194 ↛ 195line 194 didn't jump to line 195 because the condition on line 194 was never true

195 return 

196 

197 # Count the number of connections to contained objects (unitops or modules) 

198 connections_to_contained_objects = 0 

199 for port in connected_ports: 

200 # Check if the connected unitop is in the contained objects 

201 if port.unitOp.id in contained_objects_id: 

202 connections_to_contained_objects += 1 

203 

204 # get the original graphic object for reference 

205 original_graphic_object = simulation_object.graphicObject.last() 

206 if not original_graphic_object: 206 ↛ 207line 206 didn't jump to line 207 because the condition on line 206 was never true

207 return 

208 

209 

210 def groups_to_add_intermediate(): 

211 """ 

212 Check if both ends of the intermediate stream are visible in the given group. 

213 If they are, return the groups to which the intermediate stream should be added. 

214 """ 

215 port1, port2 = connected_ports 

216 

217 def get_group_path(group): 

218 path = [] 

219 while group: 

220 path.append(group) 

221 group = group.get_parent_group() 

222 return path # [leaf, ..., root] 

223 

224 path1 = get_group_path(port1.unitOp.get_group()) 

225 path2 = get_group_path(port2.unitOp.get_group()) 

226 

227 # The LCA is the first group in path1 that is also in path2 

228 lca = next((group for group in path1 if group in path2), None) 

229 if lca is None: 229 ↛ 230line 229 didn't jump to line 230 because the condition on line 229 was never true

230 return [] 

231 

232 # Collect all groups from path1 up to and including the LCA 

233 groups = [] 

234 for group in path1: 234 ↛ 239line 234 didn't jump to line 239 because the loop on line 234 didn't complete

235 groups.append(group) 

236 if group == lca: 

237 break 

238 # Collect all groups from path2 up to but not including the LCA (avoid duplicates) 

239 for group in path2: 239 ↛ 244line 239 didn't jump to line 244 because the loop on line 239 didn't complete

240 if group == lca: 

241 break 

242 if group not in groups: 242 ↛ 239line 242 didn't jump to line 239 because the condition on line 242 was always true

243 groups.append(group) 

244 return groups 

245 

246 

247 # we can use the groups_to_add_intermediate function to determine which groups to add the stream to 

248 # we get the groups to put the intermediate streams so just add it to those groups 

249 simulation_object.graphicObject.all().delete() 

250 groups_to_add = groups_to_add_intermediate() 

251 for group in groups_to_add: 

252 if not simulation_object.graphicObject.filter(group=group).exists(): 252 ↛ 251line 252 didn't jump to line 251 because the condition on line 252 was always true

253 GraphicObject.objects.create( 

254 flowsheet=simulation_object.flowsheet, 

255 simulationObject=simulation_object, 

256 width=original_graphic_object.width, 

257 height=original_graphic_object.height, 

258 x=original_graphic_object.x, 

259 y=original_graphic_object.y, 

260 group=group 

261 ) 

262 

263 # Creates recycle graphic objects in this group iteration if stream has a recycle connection 

264 if simulation_object.has_recycle_connection: 

265 # Get the recycle associated with this stream 

266 recycle = simulation_object.recycleConnection 

267 

268 # Prevent duplicating the recycle graphic object in the same group 

269 if recycle.simulationObject.graphicObject.filter(group=group).exists(): 

270 continue 

271 

272 # Set graphic objects for the recycle block in the same groups as the stream 

273 default_graphic = recycle.simulationObject.graphicObject.last() 

274 default_width = default_graphic.width if default_graphic else 32 

275 default_height = default_graphic.height if default_graphic else 32\ 

276 

277 # Make sure a stream graphic exists in this group 

278 stream_graphic = simulation_object.graphicObject.filter(group=group).last() 

279 if stream_graphic is None: 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true

280 continue 

281 

282 # Recreate the recycle graphic object 

283 GraphicObject.objects.create( 

284 flowsheet=simulation_object.flowsheet, 

285 simulationObject=recycle.simulationObject, 

286 width=default_width, 

287 height=default_height, 

288 x=stream_graphic.x + (stream_graphic.width - default_width) / 2, 

289 y=stream_graphic.y + stream_graphic.height + 30, 

290 group=group, 

291 ) 

292