Coverage for backend/django/flowsheetInternals/unitops/viewsets/DuplicateSimulationObject.py: 87%

160 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-06-23 21:51 +0000

1from django.db import transaction 

2from django.db.models import Prefetch 

3from core.auxiliary.enums.unitOpData import SimulationObjectClass 

4from core.auxiliary.methods.copy_flowsheet.copy_caching import ModelLookup, ModelLookupDict 

5from core.auxiliary.methods.copy_flowsheet.copy_formulas import update_formulas 

6from core.auxiliary.methods.copy_object import CopyObject, RemapOrRetainMode 

7from core.auxiliary.models.PropertySet import PropertySet 

8from core.auxiliary.models.PropertyInfo import PropertyInfo 

9from core.auxiliary.models.PropertyValue import PropertyValue, PropertyValueIntermediate 

10from core.auxiliary.models.ControlValue import ControlValue 

11from core.auxiliary.models.IndexedItem import IndexedItem 

12from core.auxiliary.models.RecycleData import RecycleData 

13from flowsheetInternals.unitops.models.Port import Port 

14from flowsheetInternals.graphicData.models.groupingModel import Grouping 

15from flowsheetInternals.graphicData.models.graphicObjectModel import GraphicObject 

16from flowsheetInternals.unitops.models import SimulationObject 

17 

18class Coords: 

19 def __init__(self, x, y): 

20 self.x = x 

21 self.y = y 

22 

23 

24def calc_centre_simulation_objects(simulation_object_collection): 

25 from django.db.models import Min, Max, F, ExpressionWrapper, FloatField 

26 aggregated = GraphicObject.objects.filter( 

27 simulationObject__in=simulation_object_collection 

28 ).aggregate( 

29 min_x=Min('x'), 

30 max_edge_x=Max(ExpressionWrapper(F('x') + F('width'), output_field=FloatField())), 

31 min_y=Min('y'), 

32 max_edge_y=Max(ExpressionWrapper(F('y') + F('height'), output_field=FloatField())) 

33 ) 

34 centre_x = (aggregated['min_x'] + aggregated['max_edge_x']) / 2 

35 centre_y = (aggregated['min_y'] + aggregated['max_edge_y']) / 2 

36 return Coords(centre_x, centre_y) 

37 

38 

39class SimulationObjectDuplicator: 

40 """ 

41 `CopyObject` handles the plain row duplication for the selected subset. 

42 

43 The remaining methods here handle behaviours that are more specific to 

44 simulation-object copy/paste and therefore do not belong in the generic 

45 object copier: 

46 - rename copied simulation objects 

47 - offset copied graphics to the requested drop point 

48 - rebuild M2M and through-model relationships 

49 - copy controls only when both ends are part of the copied subset 

50 """ 

51 

52 def build_copy_spec(self, original_simulation_objects): 

53 return CopyObject( 

54 SimulationObject, 

55 queryset=original_simulation_objects, 

56 copy_relation_to={ 

57 "properties": CopyObject( 

58 PropertySet, 

59 copy_relation_to={ 

60 "ContainedProperties": CopyObject( 

61 PropertyInfo, 

62 copy_relation_to={ 

63 "values": CopyObject( 

64 PropertyValue, 

65 ) 

66 }, 

67 ) 

68 }, 

69 ), 

70 "graphicObject": CopyObject( 

71 GraphicObject, 

72 forward_relation_policy={"group": RemapOrRetainMode.COPY_IF_SELECTED_ELSE_KEEP}, 

73 ), 

74 "ports": CopyObject( 

75 Port, 

76 forward_relation_policy={"stream": RemapOrRetainMode.COPY_IF_SELECTED_ELSE_KEEP}, 

77 ), 

78 "grouping": CopyObject( 

79 Grouping, 

80 ), 

81 "recycleData": CopyObject( 

82 RecycleData, 

83 forward_relation_policy={"tearObject": RemapOrRetainMode.COPY_IF_SELECTED_ELSE_NULL}, 

84 ), 

85 }, 

86 ) 

87 

88 def duplicate(self, original_simulation_objects) -> ModelLookupDict: 

89 model_lookups: ModelLookupDict = {} 

90 self.build_copy_spec(original_simulation_objects).copy(model_lookups) 

91 return model_lookups 

92 

93 def update_simulation_object_names(self, model_lookups: ModelLookupDict) -> None: 

94 simulation_objects = list(model_lookups.get(SimulationObject, ModelLookup([]))) 

95 for simulation_object in simulation_objects: 

96 simulation_object.componentName = simulation_object.componentName + " copy" 

97 if simulation_objects: 97 ↛ exitline 97 didn't return from function 'update_simulation_object_names' because the condition on line 97 was always true

98 SimulationObject.objects.bulk_update(simulation_objects, ["componentName"]) 

99 

100 def update_graphic_positions(self, model_lookups: ModelLookupDict, delta: Coords) -> None: 

101 graphic_objects = list(model_lookups.get(GraphicObject, ModelLookup([]))) 

102 for graphic_object in graphic_objects: 

103 graphic_object.x += delta.x 

104 graphic_object.y += delta.y 

105 if graphic_objects: 105 ↛ exitline 105 didn't return from function 'update_graphic_positions' because the condition on line 105 was always true

106 GraphicObject.objects.bulk_update(graphic_objects, ["x", "y"]) 

107 

108 def update_grouping_property_infos(self, model_lookups: ModelLookupDict) -> None: 

109 grouping_lookup = model_lookups.get(Grouping) 

110 property_info_lookup = model_lookups.get(PropertyInfo) 

111 if grouping_lookup is None: 

112 return 

113 

114 for original_grouping_pk, copied_grouping in grouping_lookup.model_map.items(): 

115 original_grouping = Grouping.objects.get(pk=original_grouping_pk) 

116 copied_property_infos = [] 

117 for original_property_info in original_grouping.propertyInfos.all(): 117 ↛ 118line 117 didn't jump to line 118 because the loop on line 117 never started

118 if property_info_lookup is None: 

119 copied_property_infos.append(original_property_info) 

120 continue 

121 copied_property_info = property_info_lookup.get_model(original_property_info.pk) 

122 copied_property_infos.append(copied_property_info or original_property_info) 

123 

124 if copied_property_infos: 124 ↛ 125line 124 didn't jump to line 125 because the condition on line 124 was never true

125 copied_grouping.propertyInfos.set(copied_property_infos) 

126 

127 def duplicate_control_values(self, model_lookups: ModelLookupDict) -> None: 

128 property_value_lookup = model_lookups.get(PropertyValue) 

129 if property_value_lookup is None: 

130 return 

131 

132 original_value_ids = list(property_value_lookup.model_map.keys()) 

133 original_control_values = ControlValue.objects.filter( 

134 setPoint_id__in=original_value_ids 

135 ).select_related("manipulated", "setPoint") 

136 

137 new_control_values = [] 

138 for original_control_value in original_control_values: 

139 new_setpoint = property_value_lookup.get_model(original_control_value.setPoint_id) 

140 new_manipulated = property_value_lookup.get_model(original_control_value.manipulated_id) 

141 if new_setpoint is None or new_manipulated is None: 141 ↛ 142line 141 didn't jump to line 142 because the condition on line 141 was never true

142 continue 

143 new_control_values.append( 

144 ControlValue( 

145 setPoint=new_setpoint, 

146 manipulated=new_manipulated, 

147 flowsheet_id=new_setpoint.flowsheet_id, 

148 ) 

149 ) 

150 

151 if new_control_values: 

152 ControlValue.objects.bulk_create(new_control_values) 

153 

154 def duplicate_indexed_items(self, model_lookups: ModelLookupDict) -> None: 

155 property_value_lookup = model_lookups.get(PropertyValue) 

156 simulation_object_lookup = model_lookups.get(SimulationObject) 

157 if property_value_lookup is None: 

158 return 

159 

160 original_property_values = list( 

161 PropertyValue.objects 

162 .filter(pk__in=property_value_lookup.model_map.keys()) 

163 .prefetch_related("indexedItems") 

164 ) 

165 

166 original_indexed_item_map = {} 

167 for original_property_value in original_property_values: 

168 original_indexed_item_map[original_property_value.pk] = [ 

169 indexed_item.pk for indexed_item in original_property_value.indexedItems.all() 

170 ] 

171 

172 original_indexed_ids = { 

173 pk for id_list in original_indexed_item_map.values() for pk in id_list 

174 } 

175 if not original_indexed_ids: 

176 return 

177 

178 indexed_item_lookup = ModelLookup([]) 

179 new_indexed_items = [] 

180 original_indexed_items = IndexedItem.objects.filter(pk__in=original_indexed_ids).select_related("owner") 

181 for original_indexed_item in original_indexed_items: 

182 new_owner = None 

183 if original_indexed_item.owner_id is not None and simulation_object_lookup is not None: 183 ↛ 185line 183 didn't jump to line 185 because the condition on line 183 was always true

184 new_owner = simulation_object_lookup.get_model(original_indexed_item.owner_id) 

185 if new_owner is None: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true

186 new_owner = original_indexed_item.owner 

187 

188 new_indexed_item = IndexedItem( 

189 owner=new_owner, 

190 key=original_indexed_item.key, 

191 displayName=original_indexed_item.displayName, 

192 type=original_indexed_item.type, 

193 flowsheet_id=original_indexed_item.flowsheet_id, 

194 ) 

195 new_indexed_item.save() 

196 indexed_item_lookup.model_map[original_indexed_item.pk] = new_indexed_item 

197 new_indexed_items.append(new_indexed_item) 

198 

199 all_intermediates = [] 

200 for original_property_value in original_property_values: 

201 copied_property_value = property_value_lookup.get_model(original_property_value.pk) 

202 for indexed_item_pk in original_indexed_item_map[original_property_value.pk]: 

203 all_intermediates.append( 

204 PropertyValueIntermediate( 

205 propertyvalue_id=copied_property_value.pk, 

206 indexeditem_id=indexed_item_lookup.get_model(indexed_item_pk).pk, 

207 ) 

208 ) 

209 if all_intermediates: 209 ↛ exitline 209 didn't return from function 'duplicate_indexed_items' because the condition on line 209 was always true

210 PropertyValueIntermediate.objects.bulk_create(all_intermediates) 

211 

212 def apply_recycle_updates(self, model_lookups: ModelLookupDict) -> None: 

213 recycle_lookup = model_lookups.get(RecycleData) 

214 if recycle_lookup is None: 

215 return 

216 

217 for copied_recycle_data in recycle_lookup: 

218 if copied_recycle_data.tearObject_id is not None: 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true

219 copied_recycle_data.update(copied_recycle_data.tearObject) 

220 

221 

222class DuplicateSimulationObject: 

223 def handle_duplication_request(self, flowsheet: int, validated_data): 

224 object_ids = validated_data.get('objectIDs') or [] 

225 if not object_ids: 225 ↛ 226line 225 didn't jump to line 226 because the condition on line 225 was never true

226 return 

227 

228 with transaction.atomic(): 

229 expanded_ids = self._expand_object_ids(object_ids) 

230 if not expanded_ids: 230 ↛ 231line 230 didn't jump to line 231 because the condition on line 230 was never true

231 return 

232 

233 original_simulation_objects = list( 

234 SimulationObject.objects 

235 .filter(pk__in=expanded_ids) 

236 .select_related('flowsheet', 'grouping', 'recycleData') 

237 .prefetch_related( 

238 'graphicObject', 

239 'ports', 

240 'properties__ContainedProperties__values__indexedItems', 

241 'graphicObject', 

242 Prefetch('grouping__graphicObjects', queryset=GraphicObject.objects.select_related('simulationObject')), 

243 'grouping__propertyInfos', 

244 ) 

245 ) 

246 

247 if not original_simulation_objects: 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true

248 return 

249 

250 # calculate the centre of the original simulation objects 

251 old_centre = calc_centre_simulation_objects(original_simulation_objects) 

252 # calculate the new centre of the duplicated simulation objects 

253 new_centre = Coords(validated_data.get('x'), validated_data.get('y')) 

254 delta = Coords(new_centre.x - old_centre.x, new_centre.y - old_centre.y) 

255 

256 duplicator = SimulationObjectDuplicator() 

257 model_lookups = duplicator.duplicate(original_simulation_objects) 

258 duplicator.update_simulation_object_names(model_lookups) 

259 duplicator.update_graphic_positions(model_lookups, delta) 

260 duplicator.update_grouping_property_infos(model_lookups) 

261 duplicator.duplicate_control_values(model_lookups) 

262 duplicator.duplicate_indexed_items(model_lookups) 

263 update_formulas(model_lookups, allow_not_found=True) 

264 duplicator.apply_recycle_updates(model_lookups) 

265 

266 def _expand_object_ids(self, object_ids): 

267 """Recursively collect all simulation objects contained within selected groups.""" 

268 if not object_ids: 268 ↛ 269line 268 didn't jump to line 269 because the condition on line 268 was never true

269 return set() 

270 

271 discovered = set() 

272 queue = set(object_ids) 

273 

274 while queue: 

275 batch_ids = list(queue) 

276 queue.clear() 

277 queryset = ( 

278 SimulationObject.objects 

279 .filter(pk__in=batch_ids) 

280 .select_related('grouping') 

281 .prefetch_related( 

282 Prefetch( 

283 'grouping__graphicObjects', 

284 queryset=GraphicObject.objects.select_related('simulationObject') 

285 ) 

286 ) 

287 ) 

288 

289 for simulation_object in queryset: 

290 if simulation_object.pk in discovered: 290 ↛ 291line 290 didn't jump to line 291 because the condition on line 290 was never true

291 continue 

292 

293 discovered.add(simulation_object.pk) 

294 

295 grouping = getattr(simulation_object, 'grouping', None) 

296 if simulation_object.objectType == SimulationObjectClass.Group and grouping is not None: 

297 for graphic_object in grouping.graphicObjects.all(): 

298 child = graphic_object.simulationObject 

299 if child and child.pk not in discovered: 

300 queue.add(child.pk) 

301 

302 return discovered