Coverage for backend/django/flowsheetInternals/graphicData/logic/make_group.py: 88%
144 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-03-26 20:57 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-03-26 20:57 +0000
1from typing import List
2from core.auxiliary.enums.unitOpData import SimulationObjectClass
3from core.auxiliary.enums import ConType
4from flowsheetInternals.graphicData.models.groupingModel import Breadcrumbs, Grouping, GraphicObject
5from flowsheetInternals.unitops.models.SimulationObject import SimulationObject
6import random
7from typing import List
8from core.auxiliary.models.ObjectTypeCounter import ObjectTypeCounter
9from django.db.models import Count, QuerySet, Prefetch
10from flowsheetInternals.unitops.models.Port import Port
13def make_group(contained_objects_id: List[int]) -> Grouping:
14 """
15 Creates a group
17 Args:
18 containedObjects_id (List[SimulationObject id]): Simulation objects to include in the group.
19 is_abstract: (bool): This hasn't really been implemented yet, but the idea was to have
20 groups shown fully in the parent, with modules hidden inside something.
21 """
23 # Ensure all unit operations share the same group
24 simulation_objects = SimulationObject.objects.filter(pk__in=contained_objects_id).prefetch_related(
25 Prefetch(
26 "connectedPorts",
27 queryset=Port.objects.select_related("unitOp").prefetch_related()
28 )
29 )
30 unit_operations = (
31 simulation_objects.exclude(objectType=SimulationObjectClass.Stream)
32 .exclude(objectType=SimulationObjectClass.Recycle) # exclude recycle block because we want to propagate them like streams
33 .all()
34 )
35 streams = simulation_objects.filter(objectType=SimulationObjectClass.Stream)
36 group_ids = [g.id for g in simulation_objects.filter(objectType=SimulationObjectClass.Group)]
37 unitop_ids = [uo.id for uo in unit_operations]
39 #Remove orphaned streams
40 for stream in streams:
41 stream_group_ids = [g.simulationObject.pk for g in stream.get_groups()]
42 connected_unitop_ids = [p.unitOp.id for p in stream.connectedPorts.all()]
43 #check for orphan and catch case where stream is between two packed groups
44 if not bool(set(connected_unitop_ids).intersection(unitop_ids)) and not bool(set(stream_group_ids).intersection(group_ids)): 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true
45 contained_objects_id.remove(stream.id)
47 if not unit_operations.exists(): 47 ↛ 48line 47 didn't jump to line 48 because the condition on line 47 was never true
48 raise ValueError("No unit operations provided to determine group.")
50 # Get the common group from the unit operations
51 parent_group: Grouping = unit_operations.first().graphicObject.last().group
52 if any(unitop.graphicObject.last().group != parent_group for unitop in unit_operations): 52 ↛ 53line 52 didn't jump to line 53 because the condition on line 52 was never true
53 raise ValueError("All unit operations must belong to the same group.")
55 # Get all connections in current group within the contained objects
56 simulation_object_ids = [obj.pk for obj in simulation_objects]
57 selected_connections = [connection for connection in parent_group.get_connections() if
58 connection.unitOp in simulation_object_ids]
60 # Find intermediate streams which were not part of the initial selection (hanging streams)
61 no_duplicate_list = set()
62 hanging_streams = [connection.stream for connection in selected_connections if
63 connection.stream in no_duplicate_list or no_duplicate_list.add(connection.stream)]
64 # Add hanging streams to contained objects if not already included
65 for stream in hanging_streams:
66 if stream not in contained_objects_id:
67 contained_objects_id.append(stream)
69 # Include streams only if they belong to the same group
70 for unitop in unit_operations:
71 for port in unitop.ports.all():
72 if port.stream and port.stream.id not in contained_objects_id:
73 group_ids = [g.pk for g in port.stream.get_groups()]
74 if parent_group.pk in group_ids: 74 ↛ 71line 74 didn't jump to line 71 because the condition on line 74 was always true
75 contained_objects_id.append(port.stream.id)
77 # Get inlet and outlet streams connected to groups
78 in_out_streams : list[int] = SimulationObject.objects.annotate(port_count=Count('connectedPorts')).filter(pk__in=no_duplicate_list, port_count=1).values_list('pk', flat=True)
80 # Add inlet and outlet streams to contained objects if not already included
81 for stream in in_out_streams:
82 if stream not in contained_objects_id:
83 contained_objects_id.append(stream)
85 # Re-filter simulation objects after appending streams
86 simulation_objects = SimulationObject.objects.filter(pk__in=contained_objects_id)
88 flowsheet = simulation_objects.first().flowsheet
90 idx_for_type = ObjectTypeCounter.next_for(flowsheet, "module")
91 componentName = f"Module {idx_for_type}"
93 # Create the new group
94 simulation_object = simulation_objects.first()
95 flowsheet = simulation_object.flowsheet
96 new_group = Grouping.create(flowsheet, parent_group, componentName=componentName)
97 new_group.update_internal_simulation_objects(simulation_objects)
98 new_group.set_group_size()
100 inlet_streams = []
101 outlet_streams = []
103 # handle streams for all simulation objects
104 for simulation_object in simulation_objects:
106 # propagate streams connected to one port
107 if simulation_object.is_stream() and simulation_object.connectedPorts.count() == 1:
108 if simulation_object.connectedPorts.all()[0].direction == ConType.Inlet:
109 inlet_streams.append(simulation_object)
110 else:
111 outlet_streams.append(simulation_object)
113 # handle intermediate streams connected to the module
114 if simulation_object.is_stream() and simulation_object.connectedPorts.count() == 2:
115 propagate_intermediate_streams(simulation_object, contained_objects_id)
117 propagate_streams(inlet_streams, ConType.Inlet)
118 propagate_streams(outlet_streams, ConType.Outlet)
120 # update zone of stream data entries
121 for unitop in unit_operations:
122 for stream_data_entry in unitop.StreamDataEntries.all():
123 stream_data_entry.group = new_group
124 stream_data_entry.save()
126 return new_group
129def propagate_streams(streams: List[SimulationObject], direction: ConType):
130 """
131 Propagates a stream's graphic objects to all parent groups of its connected unit operations.
133 Args:
134 simulation_object (SimulationObject): The stream to propagate.
135 """
137 if direction == ConType.Inlet:
138 x_offset = -1
139 else:
140 x_offset = 2
141 for simulation_object in streams:
142 if(len(streams) <= 2):
143 y_offset = (0.5 + streams.index(simulation_object)) / len(streams)
144 y_gap = -12.5
145 # space out the streams if there are too many of them
146 else:
147 y_offset = 0.5 * streams.index(simulation_object)
148 y_gap = -12.5 * (2 * len(streams) - 5)
150 if not simulation_object.is_stream(): 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true
151 pass
153 # make sure the stream is connected to one port (0 and 2 or more should not propagate)
154 connected_ports: QuerySet[Port]= simulation_object.connectedPorts.all()
155 if len(connected_ports) != 1: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true
156 pass
158 # get the original graphic object for reference
159 original_stream_graphic_object : GraphicObject = simulation_object.graphicObject.first()
160 if not original_stream_graphic_object: 160 ↛ 161line 160 didn't jump to line 161 because the condition on line 160 was never true
161 pass
163 # propagate to all parent groups of the connected unit operation
164 unit_op : SimulationObject = connected_ports[0].unitOp
165 for parent_group in unit_op.get_parent_groups():
166 if not simulation_object.graphicObject.filter(group=parent_group).exists():
168 module_graphic_object = original_stream_graphic_object.group.get_graphic_object()
170 GraphicObject.objects.create(
171 flowsheet=simulation_object.flowsheet,
172 simulationObject=simulation_object,
173 width=original_stream_graphic_object.width,
174 height=original_stream_graphic_object.height,
175 x=module_graphic_object.x -12.5 + x_offset*module_graphic_object.width,
176 y=module_graphic_object.y + y_gap + y_offset*module_graphic_object.height,
177 group=parent_group,
178 )
181def propagate_intermediate_streams(simulation_object: SimulationObject, contained_objects_id: List[int]):
182 """
183 Handles the special case where an intermediate stream is connected to a module.
185 Args:
186 simulation_object (SimulationObject): The intermediate stream to propagate.
187 contained_objects_id (List[int]): The IDs of objects being abstracted.
188 """
189 if not simulation_object.is_stream(): 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true
190 return
192 # make sure the stream is an intermediate stream (connected to exactly two ports)
193 connected_ports = simulation_object.connectedPorts.all()
194 if len(connected_ports) != 2: 194 ↛ 195line 194 didn't jump to line 195 because the condition on line 194 was never true
195 return
197 # Count the number of connections to contained objects (unitops or modules)
198 connections_to_contained_objects = 0
199 for port in connected_ports:
200 # Check if the connected unitop is in the contained objects
201 if port.unitOp.id in contained_objects_id:
202 connections_to_contained_objects += 1
204 # get the original graphic object for reference
205 original_graphic_object = simulation_object.graphicObject.last()
206 if not original_graphic_object: 206 ↛ 207line 206 didn't jump to line 207 because the condition on line 206 was never true
207 return
210 def groups_to_add_intermediate():
211 """
212 Check if both ends of the intermediate stream are visible in the given group.
213 If they are, return the groups to which the intermediate stream should be added.
214 """
215 port1, port2 = connected_ports
217 def get_group_path(group):
218 path = []
219 while group:
220 path.append(group)
221 group = group.get_parent_group()
222 return path # [leaf, ..., root]
224 path1 = get_group_path(port1.unitOp.get_group())
225 path2 = get_group_path(port2.unitOp.get_group())
227 # The LCA is the first group in path1 that is also in path2
228 lca = next((group for group in path1 if group in path2), None)
229 if lca is None: 229 ↛ 230line 229 didn't jump to line 230 because the condition on line 229 was never true
230 return []
232 # Collect all groups from path1 up to and including the LCA
233 groups = []
234 for group in path1: 234 ↛ 239line 234 didn't jump to line 239 because the loop on line 234 didn't complete
235 groups.append(group)
236 if group == lca:
237 break
238 # Collect all groups from path2 up to but not including the LCA (avoid duplicates)
239 for group in path2: 239 ↛ 244line 239 didn't jump to line 244 because the loop on line 239 didn't complete
240 if group == lca:
241 break
242 if group not in groups: 242 ↛ 239line 242 didn't jump to line 239 because the condition on line 242 was always true
243 groups.append(group)
244 return groups
247 # we can use the groups_to_add_intermediate function to determine which groups to add the stream to
248 # we get the groups to put the intermediate streams so just add it to those groups
249 simulation_object.graphicObject.all().delete()
250 groups_to_add = groups_to_add_intermediate()
251 for group in groups_to_add:
252 if not simulation_object.graphicObject.filter(group=group).exists(): 252 ↛ 251line 252 didn't jump to line 251 because the condition on line 252 was always true
253 GraphicObject.objects.create(
254 flowsheet=simulation_object.flowsheet,
255 simulationObject=simulation_object,
256 width=original_graphic_object.width,
257 height=original_graphic_object.height,
258 x=original_graphic_object.x,
259 y=original_graphic_object.y,
260 group=group
261 )
263 # Creates recycle graphic objects in this group iteration if stream has a recycle connection
264 if simulation_object.has_recycle_connection:
265 # Get the recycle associated with this stream
266 recycle = simulation_object.recycleConnection
268 # Prevent duplicating the recycle graphic object in the same group
269 if recycle.simulationObject.graphicObject.filter(group=group).exists():
270 continue
272 # Set graphic objects for the recycle block in the same groups as the stream
273 default_graphic = recycle.simulationObject.graphicObject.last()
274 default_width = default_graphic.width if default_graphic else 32
275 default_height = default_graphic.height if default_graphic else 32\
277 # Make sure a stream graphic exists in this group
278 stream_graphic = simulation_object.graphicObject.filter(group=group).last()
279 if stream_graphic is None: 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true
280 continue
282 # Recreate the recycle graphic object
283 GraphicObject.objects.create(
284 flowsheet=simulation_object.flowsheet,
285 simulationObject=recycle.simulationObject,
286 width=default_width,
287 height=default_height,
288 x=stream_graphic.x + (stream_graphic.width - default_width) / 2,
289 y=stream_graphic.y + stream_graphic.height + 30,
290 group=group,
291 )