Coverage for backend/django/flowsheetInternals/unitops/viewsets/DuplicateSimulationObject.py: 92%
215 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-12-18 04:00 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-12-18 04:00 +0000
1from django.db import transaction
2from django.db.models import Prefetch
3from itertools import chain
4from core.auxiliary.enums.unitOpData import SimulationObjectClass
5from core.auxiliary.models.PropertySet import PropertySet
6from core.auxiliary.models.PropertyInfo import PropertyInfo
7from core.auxiliary.models.PropertyValue import PropertyValue, PropertyValueIntermediate
8from core.auxiliary.models.IndexedItem import IndexedItem
9from core.auxiliary.models.RecycleData import RecycleData
10from flowsheetInternals.propertyPackages.models.SimulationObjectPropertyPackages import SimulationObjectPropertyPackages
11from flowsheetInternals.unitops.models.Port import Port
12from flowsheetInternals.graphicData.models.groupingModel import Grouping
13from flowsheetInternals.graphicData.models.graphicObjectModel import GraphicObject
14from flowsheetInternals.unitops.models import SimulationObject
16class Coords:
17 def __init__(self, x, y):
18 self.x = x
19 self.y = y
21def calc_centre_simulation_objects(simulation_object_collection):
22 from django.db.models import Min, Max, F, ExpressionWrapper, FloatField
23 aggregated = GraphicObject.objects.filter(
24 simulationObject__in=simulation_object_collection
25 ).aggregate(
26 min_x=Min('x'),
27 max_edge_x=Max(ExpressionWrapper(F('x') + F('width'), output_field=FloatField())),
28 min_y=Min('y'),
29 max_edge_y=Max(ExpressionWrapper(F('y') + F('height'), output_field=FloatField()))
30 )
31 centre_x = (aggregated['min_x'] + aggregated['max_edge_x']) / 2
32 centre_y = (aggregated['min_y'] + aggregated['max_edge_y']) / 2
33 return Coords(centre_x, centre_y)
35class SimulationObjectDuplicator:
37 def __init__(self, flowsheet_id):
38 self.flowsheet_id = flowsheet_id
40 def create_duplicate_simulation_objects(self, original_simulation_objects):
41 original_to_duplicate_map = {}
42 new_simulation_objects = []
43 for original_simulation_object in original_simulation_objects:
44 new_simulation_object = SimulationObject(
45 flowsheet=original_simulation_object.flowsheet,
46 objectType=original_simulation_object.objectType,
47 componentName=original_simulation_object.componentName + ' copy'
48 )
49 original_to_duplicate_map[original_simulation_object] = new_simulation_object
50 new_simulation_objects.append(new_simulation_object)
51 return original_to_duplicate_map, new_simulation_objects
53 def duplicate_groupings(self, original_simulation_objects, original_to_duplicate_map):
54 grouping_map = {}
55 for original_simulation_object in original_simulation_objects:
56 if original_simulation_object.objectType != SimulationObjectClass.Group:
57 continue
58 original_grouping = getattr(original_simulation_object, "grouping", None)
59 if original_grouping is None: 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true
60 continue
61 new_grouping = Grouping.objects.create(
62 flowsheet=original_grouping.flowsheet,
63 simulationObject=original_to_duplicate_map[original_simulation_object],
64 abstractionType=original_grouping.abstractionType
65 )
66 grouping_map[original_grouping.pk] = new_grouping
67 property_infos = list(original_grouping.propertyInfos.all())
68 if property_infos: 68 ↛ 69line 68 didn't jump to line 69 because the condition on line 68 was never true
69 new_grouping.propertyInfos.set(property_infos)
70 return grouping_map
72 def duplicate_graphics(self, delta, original_simulation_objects, original_to_duplicate_map, grouping_map):
73 all_graphics = []
74 property_set_map = {}
75 delta_x = delta.x if delta else 0
76 delta_y = delta.y if delta else 0
77 for original_simulation_object in original_simulation_objects:
78 new_simulation_object = original_to_duplicate_map[original_simulation_object]
79 try:
80 old_set = original_simulation_object.properties
81 except PropertySet.DoesNotExist:
82 old_set = None
84 if old_set is not None: 84 ↛ 88line 84 didn't jump to line 88 because the condition on line 84 was always true
85 new_set = PropertySet(simulationObject=new_simulation_object, flowsheet_id=self.flowsheet_id)
86 property_set_map[old_set.pk] = new_set
88 for graphic_object in original_simulation_object.graphicObject.all():
89 if graphic_object is None: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true
90 continue
91 new_group = grouping_map.get(graphic_object.group_id, graphic_object.group)
92 new_graphics_object = GraphicObject(
93 simulationObject=new_simulation_object,
94 width=graphic_object.width,
95 height=graphic_object.height,
96 x=graphic_object.x + delta_x,
97 y=graphic_object.y + delta_y,
98 group=new_group,
99 flowsheet_id=self.flowsheet_id,
100 visible=graphic_object.visible,
101 rotation=graphic_object.rotation,
102 flipped=graphic_object.flipped
103 )
104 all_graphics.append(new_graphics_object)
105 return all_graphics, property_set_map
107 def duplicate_port_data(self, original_simulation_objects, original_to_duplicate_map):
108 all_ports_to_create = []
109 all_ports_updates = []
110 all_original_ports = Port.objects.filter(unitOp__in=original_simulation_objects).select_related('unitOp')
111 port_map = {}
112 for port in all_original_ports:
113 new_unitOp = original_to_duplicate_map[port.unitOp]
114 new_port = Port(
115 displayName=port.displayName,
116 key=port.key,
117 index=port.index,
118 direction=port.direction,
119 unitOp=new_unitOp,
120 flowsheet_id=self.flowsheet_id
121 )
122 port_map[port.pk] = new_port
123 all_ports_to_create.append(new_port)
124 return all_ports_to_create, all_ports_updates, port_map
126 def duplicate_recycle_data(self, original_simulation_objects, original_to_duplicate_map):
127 recycle_data_to_create = []
128 recycle_updates = []
129 for original_simulation_object in original_simulation_objects:
130 try:
131 original_recycle_data = original_simulation_object.recycleData
132 except RecycleData.DoesNotExist:
133 continue
135 new_simulation_object = original_to_duplicate_map.get(original_simulation_object)
136 if not new_simulation_object: 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true
137 continue
139 tear_object = original_recycle_data.tearObject
140 new_tear_object = original_to_duplicate_map.get(tear_object) if tear_object else None
142 new_recycle_data = RecycleData(
143 flowsheet_id=self.flowsheet_id,
144 simulationObject=new_simulation_object,
145 tearObject=new_tear_object
146 )
147 recycle_data_to_create.append(new_recycle_data)
148 recycle_updates.append((new_recycle_data, new_tear_object))
150 if recycle_data_to_create:
151 RecycleData.objects.bulk_create(recycle_data_to_create)
153 return recycle_updates
155 def apply_recycle_updates(self, recycle_updates):
156 for recycle_data, tear_object in recycle_updates:
157 if tear_object is not None: 157 ↛ 158line 157 didn't jump to line 158 because the condition on line 157 was never true
158 recycle_data.update(tear_object)
160 def update_streams(self, original_simulation_objects, original_to_duplicate_map, port_map):
161 connected_ports = list(chain.from_iterable(
162 [list(sim_obj.connectedPorts.all()) for sim_obj in original_simulation_objects]
163 ))
164 all_ports_updates = []
165 for connected_port in connected_ports:
166 if connected_port.unitOp in original_to_duplicate_map:
167 new_unitOp = original_to_duplicate_map[connected_port.unitOp]
168 new_port = port_map.get(connected_port.pk)
169 if new_port: 169 ↛ 165line 169 didn't jump to line 165 because the condition on line 169 was always true
170 new_port.stream = original_to_duplicate_map.get(connected_port.stream, new_unitOp)
171 all_ports_updates.append(new_port)
172 return all_ports_updates
174 def duplicate_properties(self, property_set_map):
175 all_property_sets = list(property_set_map.values())
176 PropertySet.objects.bulk_create(all_property_sets)
178 all_original_set_ids = list(property_set_map.keys())
179 original_property_infos = list(PropertyInfo.objects.filter(set_id__in=all_original_set_ids))
180 original_property_values = list(
181 PropertyValue.objects
182 .filter(property_id__in=[property_info.pk for property_info in original_property_infos])
183 .prefetch_related('indexedItems')
184 )
186 # creates a map of original property values to their associated indexed items
187 original_indexed_item_map = {}
188 for original_property_value in original_property_values:
189 indexed_item_ids = [indexed_item.pk for indexed_item in original_property_value.indexedItems.all()]
190 if original_property_value.pk not in original_indexed_item_map: 190 ↛ 192line 190 didn't jump to line 192 because the condition on line 190 was always true
191 original_indexed_item_map[original_property_value.pk] = []
192 original_indexed_item_map[original_property_value.pk] += indexed_item_ids
194 # creates and bulk inserts property infos
195 property_info_map = {}
196 all_property_infos = []
197 for original_property_info in original_property_infos:
198 new_info = PropertyInfo(
199 set=property_set_map[original_property_info.set_id],
200 type=original_property_info.type,
201 unitType=original_property_info.unitType,
202 unit=original_property_info.unit,
203 expression=original_property_info.expression,
204 key=original_property_info.key,
205 displayName=original_property_info.displayName,
206 index=original_property_info.index,
207 flowsheet_id=self.flowsheet_id
208 )
209 property_info_map[original_property_info.pk] = new_info
210 all_property_infos.append(new_info)
211 PropertyInfo.objects.bulk_create(all_property_infos)
213 # creates and bulk inserts property values
214 value_map = {}
215 all_property_values = []
216 for original_property_value in original_property_values:
217 new_val = PropertyValue(
218 enabled=original_property_value.enabled,
219 value=original_property_value.value,
220 displayValue=original_property_value.displayValue,
221 formula=original_property_value.formula,
222 property=property_info_map[original_property_value.property_id],
223 flowsheet_id=self.flowsheet_id
224 )
225 value_map[original_property_value.pk] = new_val
226 all_property_values.append(new_val)
227 PropertyValue.objects.bulk_create(all_property_values)
229 # creates and bulk inserts indexed items
230 all_indexed_items = []
231 # gather all unique indexed item IDs from the original_indexed_item_map
232 original_indexed_ids = {
233 pk for id_list in original_indexed_item_map.values() for pk in id_list
234 }
235 original_indexed_items = IndexedItem.objects.filter(pk__in=original_indexed_ids)
236 indexed_item_map = {}
237 for original_indexed_item in original_indexed_items:
238 owner_simulation_object = list(value_map.values())[0].property.set.simulationObject
239 new_indexed_item = IndexedItem(
240 owner=owner_simulation_object,
241 key=original_indexed_item.key,
242 displayName=original_indexed_item.displayName,
243 type=original_indexed_item.type,
244 flowsheet_id=self.flowsheet_id
245 )
246 indexed_item_map[original_indexed_item.pk] = new_indexed_item
247 all_indexed_items.append(new_indexed_item)
248 IndexedItem.objects.bulk_create(all_indexed_items)
250 # creates and bulk inserts intermediary records
251 # linking duplicated property values to their associated indexed items
252 all_intermediates = []
253 for original_property_value_pk, idx_list in original_indexed_item_map.items():
254 for idx_pk in idx_list:
255 new_int = PropertyValueIntermediate(
256 propertyvalue_id=value_map[original_property_value_pk].pk,
257 indexeditem_id=indexed_item_map[idx_pk].pk
258 )
259 all_intermediates.append(new_int)
260 PropertyValueIntermediate.objects.bulk_create(all_intermediates)
262 def duplicate_packages(self, original_simulation_objects, original_to_duplicate_map):
263 new_property_packages = []
264 for original_simulation_object in original_simulation_objects:
265 new_simulation_object = original_to_duplicate_map[original_simulation_object]
266 for original_package in original_simulation_object.propertyPackages.all():
267 new_property_packages.append(
268 SimulationObjectPropertyPackages(
269 simulationObject=new_simulation_object,
270 name=original_package.name,
271 propertyPackage=original_package.propertyPackage,
272 flowsheet_id=self.flowsheet_id
273 )
274 )
275 SimulationObjectPropertyPackages.objects.bulk_create(new_property_packages)
278class DuplicateSimulationObject:
279 def handle_duplication_request(self, flowsheet: int, validated_data):
280 object_ids = validated_data.get('objectIDs') or []
281 if not object_ids: 281 ↛ 282line 281 didn't jump to line 282 because the condition on line 281 was never true
282 return
284 with transaction.atomic():
285 expanded_ids = self._expand_object_ids(object_ids)
286 if not expanded_ids: 286 ↛ 287line 286 didn't jump to line 287 because the condition on line 286 was never true
287 return
289 original_simulation_objects = list(
290 SimulationObject.objects
291 .filter(pk__in=expanded_ids)
292 .select_related('flowsheet', 'grouping')
293 .prefetch_related(
294 'properties',
295 'propertyPackages',
296 'graphicObject',
297 Prefetch('connectedPorts', queryset=Port.objects.select_related('unitOp', 'stream')),
298 Prefetch('grouping__graphicObjects', queryset=GraphicObject.objects.select_related('simulationObject')),
299 'grouping__propertyInfos'
300 )
301 )
303 if not original_simulation_objects: 303 ↛ 304line 303 didn't jump to line 304 because the condition on line 303 was never true
304 return
306 # calculate the centre of the original simulation objects
307 old_centre = calc_centre_simulation_objects(original_simulation_objects)
308 # calculate the new centre of the duplicated simulation objects
309 new_centre = Coords(validated_data.get('x'), validated_data.get('y'))
310 delta = Coords(new_centre.x - old_centre.x, new_centre.y - old_centre.y)
312 # create a new simulation object for each original simulation object
313 duplicator = SimulationObjectDuplicator(flowsheet)
314 original_to_duplicate_map, new_simulation_objects = duplicator.create_duplicate_simulation_objects(original_simulation_objects)
315 SimulationObject.objects.bulk_create(new_simulation_objects)
317 recycle_updates = duplicator.duplicate_recycle_data(original_simulation_objects, original_to_duplicate_map)
319 grouping_map = duplicator.duplicate_groupings(original_simulation_objects, original_to_duplicate_map)
321 # duplicate the graphics, properties, and packages
322 all_graphics, property_set_map = duplicator.duplicate_graphics(
323 delta, original_simulation_objects, original_to_duplicate_map, grouping_map
324 )
326 duplicator.duplicate_packages(original_simulation_objects, original_to_duplicate_map)
328 # duplicate the ports and update the streams
329 all_ports_to_create, all_ports_updates, port_map = duplicator.duplicate_port_data(
330 original_simulation_objects, original_to_duplicate_map
331 )
332 Port.objects.bulk_create(all_ports_to_create)
334 ports_updates = duplicator.update_streams(original_simulation_objects, original_to_duplicate_map, port_map)
335 Port.objects.bulk_update(ports_updates, ['stream'])
337 GraphicObject.objects.bulk_create(all_graphics)
338 duplicator.duplicate_properties(property_set_map)
339 duplicator.apply_recycle_updates(recycle_updates)
341 simulation_objects_to_update = list(original_to_duplicate_map.values())
342 SimulationObject.objects.bulk_update(simulation_objects_to_update, ['flowsheet','objectType'])
344 def _expand_object_ids(self, object_ids):
345 """Recursively collect all simulation objects contained within selected groups."""
346 if not object_ids: 346 ↛ 347line 346 didn't jump to line 347 because the condition on line 346 was never true
347 return set()
349 discovered = set()
350 queue = set(object_ids)
352 while queue:
353 batch_ids = list(queue)
354 queue.clear()
355 queryset = (
356 SimulationObject.objects
357 .filter(pk__in=batch_ids)
358 .select_related('grouping')
359 .prefetch_related(
360 Prefetch(
361 'grouping__graphicObjects',
362 queryset=GraphicObject.objects.select_related('simulationObject')
363 )
364 )
365 )
367 for simulation_object in queryset:
368 if simulation_object.pk in discovered:
369 continue
371 discovered.add(simulation_object.pk)
373 grouping = getattr(simulation_object, 'grouping', None)
374 if simulation_object.objectType == SimulationObjectClass.Group and grouping is not None:
375 for graphic_object in grouping.graphicObjects.all():
376 child = graphic_object.simulationObject
377 if child and child.pk not in discovered: 377 ↛ 375line 377 didn't jump to line 375 because the condition on line 377 was always true
378 queue.add(child.pk)
380 return discovered