Coverage for backend/flowsheetInternals/graphicData/logic/make_group.py: 86%
115 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
1from typing import List
2from core.auxiliary.enums.unitOpData import SimulationObjectClass
3from core.auxiliary.enums import ConType
4from flowsheetInternals.graphicData.models.groupingModel import Breadcrumbs, Grouping, GraphicObject
5from flowsheetInternals.unitops.models.SimulationObject import SimulationObject
6import random
7from typing import List
8from core.auxiliary.models.ObjectTypeCounter import ObjectTypeCounter
11def make_group(contained_objects_id: List[int]) -> Grouping:
12 """
13 Creates a group
15 Args:
16 containedObjects_id (List[SimulationObject id]): Simulation objects to include in the group.
17 is_abstract: (bool): This hasn't really been implemented yet, but the idea was to have
18 groups shown fully in the parent, with modules hidden inside something.
19 """
21 # Ensure all unit operations share the same group
22 simulation_objects = SimulationObject.objects.filter(pk__in=contained_objects_id)
23 unit_operations = simulation_objects.exclude(objectType=SimulationObjectClass.Stream).all()
24 streams = simulation_objects.filter(objectType=SimulationObjectClass.Stream)
25 group_ids = [g.id for g in simulation_objects.filter(objectType=SimulationObjectClass.Group)]
26 unitop_ids = [uo.id for uo in unit_operations]
28 #Remove orphaned streams
29 for stream in streams:
30 stream_group_ids = [g.simulationObject.pk for g in stream.get_groups()]
31 connected_unitop_ids = [p.unitOp.id for p in stream.connectedPorts.all()]
32 #check for orphan and catch case where stream is between two packed groups
33 if not bool(set(connected_unitop_ids).intersection(unitop_ids)) and not bool(set(stream_group_ids).intersection(group_ids)): 33 ↛ 34line 33 didn't jump to line 34 because the condition on line 33 was never true
34 contained_objects_id.remove(stream.id)
36 if not unit_operations.exists(): 36 ↛ 37line 36 didn't jump to line 37 because the condition on line 36 was never true
37 raise ValueError("No unit operations provided to determine group.")
39 # Get the common group from the unit operations
40 parent_group = unit_operations.first().graphicObject.last().group
41 if any(unitop.graphicObject.last().group != parent_group for unitop in unit_operations): 41 ↛ 42line 41 didn't jump to line 42 because the condition on line 41 was never true
42 raise ValueError("All unit operations must belong to the same group.")
44 # Include streams only if they belong to the same group
45 for unitop in unit_operations:
46 for port in unitop.ports.all():
47 if port.stream and port.stream.id not in contained_objects_id:
48 group_ids = [g.pk for g in port.stream.get_groups()]
49 if parent_group.pk in group_ids: 49 ↛ 46line 49 didn't jump to line 46 because the condition on line 49 was always true
50 contained_objects_id.append(port.stream.id)
52 # Re-filter simulation objects after appending streams
53 simulation_objects = SimulationObject.objects.filter(pk__in=contained_objects_id)
55 flowsheet = simulation_objects.first().flowsheet
57 idx_for_type = ObjectTypeCounter.next_for(flowsheet, "module")
58 componentName = f"Module {idx_for_type}"
60 # Create the new group
61 simulation_object = simulation_objects.first()
62 flowsheet = simulation_object.flowsheet
63 new_group = Grouping.create(flowsheet, parent_group, componentName=componentName)
64 new_group.update_internal_simulation_objects(simulation_objects)
65 new_group.set_group_size()
67 inlet_streams = []
68 outlet_streams = []
70 # handle streams for all simulation objects
71 for simulation_object in simulation_objects:
73 # propagate streams connected to one port
74 if simulation_object.is_stream() and simulation_object.connectedPorts.count() == 1:
75 if simulation_object.connectedPorts.all()[0].direction == ConType.Inlet:
76 inlet_streams.append(simulation_object)
77 else:
78 outlet_streams.append(simulation_object)
80 # handle intermediate streams connected to the module
81 if simulation_object.is_stream() and simulation_object.connectedPorts.count() == 2:
82 propagate_intermediate_streams(simulation_object, contained_objects_id)
84 propagate_streams(inlet_streams, ConType.Inlet)
85 propagate_streams(outlet_streams, ConType.Outlet)
87 # update zone of stream data entries
88 for unitop in unit_operations:
89 for stream_data_entry in unitop.StreamDataEntries.all():
90 stream_data_entry.group = new_group
91 stream_data_entry.save()
93 return new_group
96def propagate_streams(streams: List[SimulationObject], direction: ConType):
97 """
98 Propagates a stream's graphic objects to all parent groups of its connected unit operations.
100 Args:
101 simulation_object (SimulationObject): The stream to propagate.
102 """
103 if direction == ConType.Inlet:
104 x_offset = -1
105 else:
106 x_offset = 2
107 for simulation_object in streams:
108 y_offset = (0.5 + streams.index(simulation_object)) / len(streams)
109 if not simulation_object.is_stream(): 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true
110 pass
112 # make sure the stream is connected to one port (0 and 2 or more should not propagate)
113 connected_ports = simulation_object.connectedPorts.all()
114 if len(connected_ports) != 1: 114 ↛ 115line 114 didn't jump to line 115 because the condition on line 114 was never true
115 pass
117 # get the original graphic object for reference
118 original_graphic_object = simulation_object.graphicObject.first()
119 if not original_graphic_object: 119 ↛ 120line 119 didn't jump to line 120 because the condition on line 119 was never true
120 pass
122 # propagate to all parent groups of the connected unit operation
123 unit_op = connected_ports[0].unitOp
124 for parent_group in unit_op.get_parent_groups():
125 if not simulation_object.graphicObject.filter(group=parent_group).exists():
126 GraphicObject.objects.create(
127 flowsheet=simulation_object.flowsheet,
128 simulationObject=simulation_object,
129 width=original_graphic_object.width,
130 height=original_graphic_object.height,
131 x=original_graphic_object.group.get_graphic_object().x -12.5 + x_offset*original_graphic_object.group.get_graphic_object().width,
132 y=original_graphic_object.group.get_graphic_object().y -12.5 + y_offset*original_graphic_object.group.get_graphic_object().height,
133 group=parent_group,
134 )
137def propagate_intermediate_streams(simulation_object: SimulationObject, contained_objects_id: List[int]):
138 """
139 Handles the special case where an intermediate stream is connected to a module.
141 Args:
142 simulation_object (SimulationObject): The intermediate stream to propagate.
143 contained_objects_id (List[int]): The IDs of objects being abstracted.
144 """
145 if not simulation_object.is_stream(): 145 ↛ 146line 145 didn't jump to line 146 because the condition on line 145 was never true
146 return
148 # make sure the stream is an intermediate stream (connected to exactly two ports)
149 connected_ports = simulation_object.connectedPorts.all()
150 if len(connected_ports) != 2: 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true
151 return
153 # Count the number of connections to contained objects (unitops or modules)
154 connections_to_contained_objects = 0
155 for port in connected_ports:
156 # Check if the connected unitop is in the contained objects
157 if port.unitOp.id in contained_objects_id:
158 connections_to_contained_objects += 1
160 # get the original graphic object for reference
161 original_graphic_object = simulation_object.graphicObject.last()
162 if not original_graphic_object: 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true
163 return
166 def groups_to_add_intermediate():
167 """
168 Check if both ends of the intermediate stream are visible in the given group.
169 If they are, return the groups to which the intermediate stream should be added.
170 """
171 port1, port2 = connected_ports
173 def get_group_path(group):
174 path = []
175 while group:
176 path.append(group)
177 group = group.get_parent_group()
178 return path # [leaf, ..., root]
180 path1 = get_group_path(port1.unitOp.get_group())
181 path2 = get_group_path(port2.unitOp.get_group())
183 # The LCA is the first group in path1 that is also in path2
184 lca = next((group for group in path1 if group in path2), None)
185 if lca is None: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true
186 return []
188 # Collect all groups from path1 up to and including the LCA
189 groups = []
190 for group in path1: 190 ↛ 195line 190 didn't jump to line 195 because the loop on line 190 didn't complete
191 groups.append(group)
192 if group == lca:
193 break
194 # Collect all groups from path2 up to but not including the LCA (avoid duplicates)
195 for group in path2: 195 ↛ 200line 195 didn't jump to line 200 because the loop on line 195 didn't complete
196 if group == lca:
197 break
198 if group not in groups: 198 ↛ 195line 198 didn't jump to line 195 because the condition on line 198 was always true
199 groups.append(group)
200 return groups
203 # we can use the groups_to_add_intermediate function to determine which groups to add the stream to
204 # we get the groups to put the intermediate streams so just add it to those groups
205 simulation_object.graphicObject.all().delete()
206 groups_to_add = groups_to_add_intermediate()
207 for group in groups_to_add:
208 if not simulation_object.graphicObject.filter(group=group).exists(): 208 ↛ 207line 208 didn't jump to line 207 because the condition on line 208 was always true
209 GraphicObject.objects.create(
210 flowsheet=simulation_object.flowsheet,
211 simulationObject=simulation_object,
212 width=original_graphic_object.width,
213 height=original_graphic_object.height,
214 x=original_graphic_object.x,
215 y=original_graphic_object.y,
216 group=group
217 )