Coverage for backend/django/flowsheetInternals/graphicData/logic/make_group.py: 88%
140 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-12-18 04:00 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-12-18 04:00 +0000
1from typing import List
2from core.auxiliary.enums.unitOpData import SimulationObjectClass
3from core.auxiliary.enums import ConType
4from flowsheetInternals.graphicData.models.groupingModel import Breadcrumbs, Grouping, GraphicObject
5from flowsheetInternals.unitops.models.SimulationObject import SimulationObject
6import random
7from typing import List
8from core.auxiliary.models.ObjectTypeCounter import ObjectTypeCounter
9from django.db.models import Count, QuerySet, Prefetch
10from flowsheetInternals.unitops.models.Port import Port
13def make_group(contained_objects_id: List[int]) -> Grouping:
14 """
15 Creates a group
17 Args:
18 containedObjects_id (List[SimulationObject id]): Simulation objects to include in the group.
19 is_abstract: (bool): This hasn't really been implemented yet, but the idea was to have
20 groups shown fully in the parent, with modules hidden inside something.
21 """
23 # Ensure all unit operations share the same group
24 simulation_objects = SimulationObject.objects.filter(pk__in=contained_objects_id).prefetch_related(
25 Prefetch(
26 "connectedPorts",
27 queryset=Port.objects.select_related("unitOp").prefetch_related()
28 )
29 )
30 unit_operations = (
31 simulation_objects.exclude(objectType=SimulationObjectClass.Stream)
32 .exclude(objectType=SimulationObjectClass.Recycle) # exclude recycle block because we want to propagate them like streams
33 .all()
34 )
35 streams = simulation_objects.filter(objectType=SimulationObjectClass.Stream)
36 group_ids = [g.id for g in simulation_objects.filter(objectType=SimulationObjectClass.Group)]
37 unitop_ids = [uo.id for uo in unit_operations]
39 #Remove orphaned streams
40 for stream in streams:
41 stream_group_ids = [g.simulationObject.pk for g in stream.get_groups()]
42 connected_unitop_ids = [p.unitOp.id for p in stream.connectedPorts.all()]
43 #check for orphan and catch case where stream is between two packed groups
44 if not bool(set(connected_unitop_ids).intersection(unitop_ids)) and not bool(set(stream_group_ids).intersection(group_ids)): 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true
45 contained_objects_id.remove(stream.id)
47 if not unit_operations.exists(): 47 ↛ 48line 47 didn't jump to line 48 because the condition on line 47 was never true
48 raise ValueError("No unit operations provided to determine group.")
50 # Get the common group from the unit operations
51 parent_group: Grouping = unit_operations.first().graphicObject.last().group
52 if any(unitop.graphicObject.last().group != parent_group for unitop in unit_operations): 52 ↛ 53line 52 didn't jump to line 53 because the condition on line 52 was never true
53 raise ValueError("All unit operations must belong to the same group.")
55 # Get all connections in current group within the contained objects
56 simulation_object_ids = [obj.pk for obj in simulation_objects]
57 selected_connections = [connection for connection in parent_group.get_connections() if
58 connection.unitOp in simulation_object_ids]
60 # Find intermediate streams which were not part of the initial selection (hanging streams)
61 no_duplicate_list = set()
62 hanging_streams = [connection.stream for connection in selected_connections if
63 connection.stream in no_duplicate_list or no_duplicate_list.add(connection.stream)]
64 # Add hanging streams to contained objects if not already included
65 for stream in hanging_streams:
66 if stream not in contained_objects_id:
67 contained_objects_id.append(stream)
69 # Include streams only if they belong to the same group
70 for unitop in unit_operations:
71 for port in unitop.ports.all():
72 if port.stream and port.stream.id not in contained_objects_id:
73 group_ids = [g.pk for g in port.stream.get_groups()]
74 if parent_group.pk in group_ids: 74 ↛ 71line 74 didn't jump to line 71 because the condition on line 74 was always true
75 contained_objects_id.append(port.stream.id)
77 # Get inlet and outlet streams connected to groups
78 in_out_streams : list[int] = SimulationObject.objects.annotate(port_count=Count('connectedPorts')).filter(pk__in=no_duplicate_list, port_count=1).values_list('pk', flat=True)
80 # Add inlet and outlet streams to contained objects if not already included
81 for stream in in_out_streams:
82 if stream not in contained_objects_id:
83 contained_objects_id.append(stream)
85 # Re-filter simulation objects after appending streams
86 simulation_objects = SimulationObject.objects.filter(pk__in=contained_objects_id)
88 flowsheet = simulation_objects.first().flowsheet
90 idx_for_type = ObjectTypeCounter.next_for(flowsheet, "module")
91 componentName = f"Module {idx_for_type}"
93 # Create the new group
94 simulation_object = simulation_objects.first()
95 flowsheet = simulation_object.flowsheet
96 new_group = Grouping.create(flowsheet, parent_group, componentName=componentName)
97 new_group.update_internal_simulation_objects(simulation_objects)
98 new_group.set_group_size()
100 inlet_streams = []
101 outlet_streams = []
103 # handle streams for all simulation objects
104 for simulation_object in simulation_objects:
106 # propagate streams connected to one port
107 if simulation_object.is_stream() and simulation_object.connectedPorts.count() == 1:
108 if simulation_object.connectedPorts.all()[0].direction == ConType.Inlet:
109 inlet_streams.append(simulation_object)
110 else:
111 outlet_streams.append(simulation_object)
113 # handle intermediate streams connected to the module
114 if simulation_object.is_stream() and simulation_object.connectedPorts.count() == 2:
115 propagate_intermediate_streams(simulation_object, contained_objects_id)
116 # if simulation_object.recycleData.exists():
117 # print("SimObj", simulation_object.recycleData)
119 propagate_streams(inlet_streams, ConType.Inlet)
120 propagate_streams(outlet_streams, ConType.Outlet)
122 # update zone of stream data entries
123 for unitop in unit_operations:
124 for stream_data_entry in unitop.StreamDataEntries.all():
125 stream_data_entry.group = new_group
126 stream_data_entry.save()
128 return new_group
131def propagate_streams(streams: List[SimulationObject], direction: ConType):
132 """
133 Propagates a stream's graphic objects to all parent groups of its connected unit operations.
135 Args:
136 simulation_object (SimulationObject): The stream to propagate.
137 """
138 if direction == ConType.Inlet:
139 x_offset = -1
140 else:
141 x_offset = 2
142 for simulation_object in streams:
143 y_offset = (0.5 + streams.index(simulation_object)) / len(streams)
144 if not simulation_object.is_stream(): 144 ↛ 145line 144 didn't jump to line 145 because the condition on line 144 was never true
145 pass
147 # make sure the stream is connected to one port (0 and 2 or more should not propagate)
148 connected_ports: QuerySet[Port]= simulation_object.connectedPorts.all()
149 if len(connected_ports) != 1: 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true
150 pass
152 # get the original graphic object for reference
153 original_stream_graphic_object : GraphicObject = simulation_object.graphicObject.first()
154 if not original_stream_graphic_object: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true
155 pass
157 # propagate to all parent groups of the connected unit operation
158 unit_op : SimulationObject = connected_ports[0].unitOp
159 for parent_group in unit_op.get_parent_groups():
160 if not simulation_object.graphicObject.filter(group=parent_group).exists():
162 module_graphic_object = original_stream_graphic_object.group.get_graphic_object()
164 GraphicObject.objects.create(
165 flowsheet=simulation_object.flowsheet,
166 simulationObject=simulation_object,
167 width=original_stream_graphic_object.width,
168 height=original_stream_graphic_object.height,
169 x=module_graphic_object.x -12.5 + x_offset*module_graphic_object.width,
170 y=module_graphic_object.y -12.5 + y_offset*module_graphic_object.height,
171 group=parent_group,
172 )
175def propagate_intermediate_streams(simulation_object: SimulationObject, contained_objects_id: List[int]):
176 """
177 Handles the special case where an intermediate stream is connected to a module.
179 Args:
180 simulation_object (SimulationObject): The intermediate stream to propagate.
181 contained_objects_id (List[int]): The IDs of objects being abstracted.
182 """
183 if not simulation_object.is_stream(): 183 ↛ 184line 183 didn't jump to line 184 because the condition on line 183 was never true
184 return
186 # make sure the stream is an intermediate stream (connected to exactly two ports)
187 connected_ports = simulation_object.connectedPorts.all()
188 if len(connected_ports) != 2: 188 ↛ 189line 188 didn't jump to line 189 because the condition on line 188 was never true
189 return
191 # Count the number of connections to contained objects (unitops or modules)
192 connections_to_contained_objects = 0
193 for port in connected_ports:
194 # Check if the connected unitop is in the contained objects
195 if port.unitOp.id in contained_objects_id:
196 connections_to_contained_objects += 1
198 # get the original graphic object for reference
199 original_graphic_object = simulation_object.graphicObject.last()
200 if not original_graphic_object: 200 ↛ 201line 200 didn't jump to line 201 because the condition on line 200 was never true
201 return
204 def groups_to_add_intermediate():
205 """
206 Check if both ends of the intermediate stream are visible in the given group.
207 If they are, return the groups to which the intermediate stream should be added.
208 """
209 port1, port2 = connected_ports
211 def get_group_path(group):
212 path = []
213 while group:
214 path.append(group)
215 group = group.get_parent_group()
216 return path # [leaf, ..., root]
218 path1 = get_group_path(port1.unitOp.get_group())
219 path2 = get_group_path(port2.unitOp.get_group())
221 # The LCA is the first group in path1 that is also in path2
222 lca = next((group for group in path1 if group in path2), None)
223 if lca is None: 223 ↛ 224line 223 didn't jump to line 224 because the condition on line 223 was never true
224 return []
226 # Collect all groups from path1 up to and including the LCA
227 groups = []
228 for group in path1: 228 ↛ 233line 228 didn't jump to line 233 because the loop on line 228 didn't complete
229 groups.append(group)
230 if group == lca:
231 break
232 # Collect all groups from path2 up to but not including the LCA (avoid duplicates)
233 for group in path2: 233 ↛ 238line 233 didn't jump to line 238 because the loop on line 233 didn't complete
234 if group == lca:
235 break
236 if group not in groups: 236 ↛ 233line 236 didn't jump to line 233 because the condition on line 236 was always true
237 groups.append(group)
238 return groups
241 # we can use the groups_to_add_intermediate function to determine which groups to add the stream to
242 # we get the groups to put the intermediate streams so just add it to those groups
243 simulation_object.graphicObject.all().delete()
244 groups_to_add = groups_to_add_intermediate()
245 for group in groups_to_add:
246 if not simulation_object.graphicObject.filter(group=group).exists(): 246 ↛ 245line 246 didn't jump to line 245 because the condition on line 246 was always true
247 GraphicObject.objects.create(
248 flowsheet=simulation_object.flowsheet,
249 simulationObject=simulation_object,
250 width=original_graphic_object.width,
251 height=original_graphic_object.height,
252 x=original_graphic_object.x,
253 y=original_graphic_object.y,
254 group=group
255 )
257 # Creates recycle graphic objects in this group iteration if stream has a recycle connection
258 if simulation_object.has_recycle_connection:
259 # Get the recycle associated with this stream
260 recycle = simulation_object.recycleConnection
262 # Prevent duplicating the recycle graphic object in the same group
263 if recycle.simulationObject.graphicObject.filter(group=group).exists():
264 continue
266 # Set graphic objects for the recycle block in the same groups as the stream
267 default_graphic = recycle.simulationObject.graphicObject.last()
268 default_width = default_graphic.width if default_graphic else 32
269 default_height = default_graphic.height if default_graphic else 32\
271 # Make sure a stream graphic exists in this group
272 stream_graphic = simulation_object.graphicObject.filter(group=group).last()
273 if stream_graphic is None: 273 ↛ 274line 273 didn't jump to line 274 because the condition on line 273 was never true
274 continue
276 # Recreate the recycle graphic object
277 GraphicObject.objects.create(
278 flowsheet=simulation_object.flowsheet,
279 simulationObject=recycle.simulationObject,
280 width=default_width,
281 height=default_height,
282 x=stream_graphic.x + (stream_graphic.width - default_width) / 2,
283 y=stream_graphic.y + stream_graphic.height + 30,
284 group=group,
285 )