Coverage for backend/django/flowsheetInternals/unitops/viewsets/DuplicateSimulationObject.py: 87%
160 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
1from django.db import transaction
2from django.db.models import Prefetch
3from core.auxiliary.enums.unitOpData import SimulationObjectClass
4from core.auxiliary.methods.copy_flowsheet.copy_caching import ModelLookup, ModelLookupDict
5from core.auxiliary.methods.copy_flowsheet.copy_formulas import update_formulas
6from core.auxiliary.methods.copy_object import CopyObject, RemapOrRetainMode
7from core.auxiliary.models.PropertySet import PropertySet
8from core.auxiliary.models.PropertyInfo import PropertyInfo
9from core.auxiliary.models.PropertyValue import PropertyValue, PropertyValueIntermediate
10from core.auxiliary.models.ControlValue import ControlValue
11from core.auxiliary.models.IndexedItem import IndexedItem
12from core.auxiliary.models.RecycleData import RecycleData
13from flowsheetInternals.unitops.models.Port import Port
14from flowsheetInternals.graphicData.models.groupingModel import Grouping
15from flowsheetInternals.graphicData.models.graphicObjectModel import GraphicObject
16from flowsheetInternals.unitops.models import SimulationObject
18class Coords:
19 def __init__(self, x, y):
20 self.x = x
21 self.y = y
24def calc_centre_simulation_objects(simulation_object_collection):
25 from django.db.models import Min, Max, F, ExpressionWrapper, FloatField
26 aggregated = GraphicObject.objects.filter(
27 simulationObject__in=simulation_object_collection
28 ).aggregate(
29 min_x=Min('x'),
30 max_edge_x=Max(ExpressionWrapper(F('x') + F('width'), output_field=FloatField())),
31 min_y=Min('y'),
32 max_edge_y=Max(ExpressionWrapper(F('y') + F('height'), output_field=FloatField()))
33 )
34 centre_x = (aggregated['min_x'] + aggregated['max_edge_x']) / 2
35 centre_y = (aggregated['min_y'] + aggregated['max_edge_y']) / 2
36 return Coords(centre_x, centre_y)
39class SimulationObjectDuplicator:
40 """
41 `CopyObject` handles the plain row duplication for the selected subset.
43 The remaining methods here handle behaviours that are more specific to
44 simulation-object copy/paste and therefore do not belong in the generic
45 object copier:
46 - rename copied simulation objects
47 - offset copied graphics to the requested drop point
48 - rebuild M2M and through-model relationships
49 - copy controls only when both ends are part of the copied subset
50 """
52 def build_copy_spec(self, original_simulation_objects):
53 return CopyObject(
54 SimulationObject,
55 queryset=original_simulation_objects,
56 copy_relation_to={
57 "properties": CopyObject(
58 PropertySet,
59 copy_relation_to={
60 "ContainedProperties": CopyObject(
61 PropertyInfo,
62 copy_relation_to={
63 "values": CopyObject(
64 PropertyValue,
65 )
66 },
67 )
68 },
69 ),
70 "graphicObject": CopyObject(
71 GraphicObject,
72 forward_relation_policy={"group": RemapOrRetainMode.COPY_IF_SELECTED_ELSE_KEEP},
73 ),
74 "ports": CopyObject(
75 Port,
76 forward_relation_policy={"stream": RemapOrRetainMode.COPY_IF_SELECTED_ELSE_KEEP},
77 ),
78 "grouping": CopyObject(
79 Grouping,
80 ),
81 "recycleData": CopyObject(
82 RecycleData,
83 forward_relation_policy={"tearObject": RemapOrRetainMode.COPY_IF_SELECTED_ELSE_NULL},
84 ),
85 },
86 )
88 def duplicate(self, original_simulation_objects) -> ModelLookupDict:
89 model_lookups: ModelLookupDict = {}
90 self.build_copy_spec(original_simulation_objects).copy(model_lookups)
91 return model_lookups
93 def update_simulation_object_names(self, model_lookups: ModelLookupDict) -> None:
94 simulation_objects = list(model_lookups.get(SimulationObject, ModelLookup([])))
95 for simulation_object in simulation_objects:
96 simulation_object.componentName = simulation_object.componentName + " copy"
97 if simulation_objects: 97 ↛ exitline 97 didn't return from function 'update_simulation_object_names' because the condition on line 97 was always true
98 SimulationObject.objects.bulk_update(simulation_objects, ["componentName"])
100 def update_graphic_positions(self, model_lookups: ModelLookupDict, delta: Coords) -> None:
101 graphic_objects = list(model_lookups.get(GraphicObject, ModelLookup([])))
102 for graphic_object in graphic_objects:
103 graphic_object.x += delta.x
104 graphic_object.y += delta.y
105 if graphic_objects: 105 ↛ exitline 105 didn't return from function 'update_graphic_positions' because the condition on line 105 was always true
106 GraphicObject.objects.bulk_update(graphic_objects, ["x", "y"])
108 def update_grouping_property_infos(self, model_lookups: ModelLookupDict) -> None:
109 grouping_lookup = model_lookups.get(Grouping)
110 property_info_lookup = model_lookups.get(PropertyInfo)
111 if grouping_lookup is None:
112 return
114 for original_grouping_pk, copied_grouping in grouping_lookup.model_map.items():
115 original_grouping = Grouping.objects.get(pk=original_grouping_pk)
116 copied_property_infos = []
117 for original_property_info in original_grouping.propertyInfos.all(): 117 ↛ 118line 117 didn't jump to line 118 because the loop on line 117 never started
118 if property_info_lookup is None:
119 copied_property_infos.append(original_property_info)
120 continue
121 copied_property_info = property_info_lookup.get_model(original_property_info.pk)
122 copied_property_infos.append(copied_property_info or original_property_info)
124 if copied_property_infos: 124 ↛ 125line 124 didn't jump to line 125 because the condition on line 124 was never true
125 copied_grouping.propertyInfos.set(copied_property_infos)
127 def duplicate_control_values(self, model_lookups: ModelLookupDict) -> None:
128 property_value_lookup = model_lookups.get(PropertyValue)
129 if property_value_lookup is None:
130 return
132 original_value_ids = list(property_value_lookup.model_map.keys())
133 original_control_values = ControlValue.objects.filter(
134 setPoint_id__in=original_value_ids
135 ).select_related("manipulated", "setPoint")
137 new_control_values = []
138 for original_control_value in original_control_values:
139 new_setpoint = property_value_lookup.get_model(original_control_value.setPoint_id)
140 new_manipulated = property_value_lookup.get_model(original_control_value.manipulated_id)
141 if new_setpoint is None or new_manipulated is None: 141 ↛ 142line 141 didn't jump to line 142 because the condition on line 141 was never true
142 continue
143 new_control_values.append(
144 ControlValue(
145 setPoint=new_setpoint,
146 manipulated=new_manipulated,
147 flowsheet_id=new_setpoint.flowsheet_id,
148 )
149 )
151 if new_control_values:
152 ControlValue.objects.bulk_create(new_control_values)
154 def duplicate_indexed_items(self, model_lookups: ModelLookupDict) -> None:
155 property_value_lookup = model_lookups.get(PropertyValue)
156 simulation_object_lookup = model_lookups.get(SimulationObject)
157 if property_value_lookup is None:
158 return
160 original_property_values = list(
161 PropertyValue.objects
162 .filter(pk__in=property_value_lookup.model_map.keys())
163 .prefetch_related("indexedItems")
164 )
166 original_indexed_item_map = {}
167 for original_property_value in original_property_values:
168 original_indexed_item_map[original_property_value.pk] = [
169 indexed_item.pk for indexed_item in original_property_value.indexedItems.all()
170 ]
172 original_indexed_ids = {
173 pk for id_list in original_indexed_item_map.values() for pk in id_list
174 }
175 if not original_indexed_ids:
176 return
178 indexed_item_lookup = ModelLookup([])
179 new_indexed_items = []
180 original_indexed_items = IndexedItem.objects.filter(pk__in=original_indexed_ids).select_related("owner")
181 for original_indexed_item in original_indexed_items:
182 new_owner = None
183 if original_indexed_item.owner_id is not None and simulation_object_lookup is not None: 183 ↛ 185line 183 didn't jump to line 185 because the condition on line 183 was always true
184 new_owner = simulation_object_lookup.get_model(original_indexed_item.owner_id)
185 if new_owner is None: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true
186 new_owner = original_indexed_item.owner
188 new_indexed_item = IndexedItem(
189 owner=new_owner,
190 key=original_indexed_item.key,
191 displayName=original_indexed_item.displayName,
192 type=original_indexed_item.type,
193 flowsheet_id=original_indexed_item.flowsheet_id,
194 )
195 new_indexed_item.save()
196 indexed_item_lookup.model_map[original_indexed_item.pk] = new_indexed_item
197 new_indexed_items.append(new_indexed_item)
199 all_intermediates = []
200 for original_property_value in original_property_values:
201 copied_property_value = property_value_lookup.get_model(original_property_value.pk)
202 for indexed_item_pk in original_indexed_item_map[original_property_value.pk]:
203 all_intermediates.append(
204 PropertyValueIntermediate(
205 propertyvalue_id=copied_property_value.pk,
206 indexeditem_id=indexed_item_lookup.get_model(indexed_item_pk).pk,
207 )
208 )
209 if all_intermediates: 209 ↛ exitline 209 didn't return from function 'duplicate_indexed_items' because the condition on line 209 was always true
210 PropertyValueIntermediate.objects.bulk_create(all_intermediates)
212 def apply_recycle_updates(self, model_lookups: ModelLookupDict) -> None:
213 recycle_lookup = model_lookups.get(RecycleData)
214 if recycle_lookup is None:
215 return
217 for copied_recycle_data in recycle_lookup:
218 if copied_recycle_data.tearObject_id is not None: 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true
219 copied_recycle_data.update(copied_recycle_data.tearObject)
222class DuplicateSimulationObject:
223 def handle_duplication_request(self, flowsheet: int, validated_data):
224 object_ids = validated_data.get('objectIDs') or []
225 if not object_ids: 225 ↛ 226line 225 didn't jump to line 226 because the condition on line 225 was never true
226 return
228 with transaction.atomic():
229 expanded_ids = self._expand_object_ids(object_ids)
230 if not expanded_ids: 230 ↛ 231line 230 didn't jump to line 231 because the condition on line 230 was never true
231 return
233 original_simulation_objects = list(
234 SimulationObject.objects
235 .filter(pk__in=expanded_ids)
236 .select_related('flowsheet', 'grouping', 'recycleData')
237 .prefetch_related(
238 'graphicObject',
239 'ports',
240 'properties__ContainedProperties__values__indexedItems',
241 'graphicObject',
242 Prefetch('grouping__graphicObjects', queryset=GraphicObject.objects.select_related('simulationObject')),
243 'grouping__propertyInfos',
244 )
245 )
247 if not original_simulation_objects: 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true
248 return
250 # calculate the centre of the original simulation objects
251 old_centre = calc_centre_simulation_objects(original_simulation_objects)
252 # calculate the new centre of the duplicated simulation objects
253 new_centre = Coords(validated_data.get('x'), validated_data.get('y'))
254 delta = Coords(new_centre.x - old_centre.x, new_centre.y - old_centre.y)
256 duplicator = SimulationObjectDuplicator()
257 model_lookups = duplicator.duplicate(original_simulation_objects)
258 duplicator.update_simulation_object_names(model_lookups)
259 duplicator.update_graphic_positions(model_lookups, delta)
260 duplicator.update_grouping_property_infos(model_lookups)
261 duplicator.duplicate_control_values(model_lookups)
262 duplicator.duplicate_indexed_items(model_lookups)
263 update_formulas(model_lookups, allow_not_found=True)
264 duplicator.apply_recycle_updates(model_lookups)
266 def _expand_object_ids(self, object_ids):
267 """Recursively collect all simulation objects contained within selected groups."""
268 if not object_ids: 268 ↛ 269line 268 didn't jump to line 269 because the condition on line 268 was never true
269 return set()
271 discovered = set()
272 queue = set(object_ids)
274 while queue:
275 batch_ids = list(queue)
276 queue.clear()
277 queryset = (
278 SimulationObject.objects
279 .filter(pk__in=batch_ids)
280 .select_related('grouping')
281 .prefetch_related(
282 Prefetch(
283 'grouping__graphicObjects',
284 queryset=GraphicObject.objects.select_related('simulationObject')
285 )
286 )
287 )
289 for simulation_object in queryset:
290 if simulation_object.pk in discovered: 290 ↛ 291line 290 didn't jump to line 291 because the condition on line 290 was never true
291 continue
293 discovered.add(simulation_object.pk)
295 grouping = getattr(simulation_object, 'grouping', None)
296 if simulation_object.objectType == SimulationObjectClass.Group and grouping is not None:
297 for graphic_object in grouping.graphicObjects.all():
298 child = graphic_object.simulationObject
299 if child and child.pk not in discovered:
300 queue.add(child.pk)
302 return discovered