Coverage for backend/flowsheetInternals/unitops/viewsets/DuplicateSimulationObject.py: 99%

140 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-11-06 23:27 +0000

1from django.db import transaction 

2from django.db.models import Prefetch 

3from itertools import chain 

4from core.auxiliary.models.PropertySet import PropertySet 

5from core.auxiliary.models.PropertyInfo import PropertyInfo 

6from core.auxiliary.models.PropertyValue import PropertyValue, PropertyValueIntermediate 

7from core.auxiliary.models.IndexedItem import IndexedItem 

8from flowsheetInternals.propertyPackages.models.SimulationObjectPropertyPackages import SimulationObjectPropertyPackages 

9from flowsheetInternals.unitops.models.Port import Port 

10from flowsheetInternals.graphicData.models.graphicObjectModel import GraphicObject 

11from flowsheetInternals.unitops.models import SimulationObject 

12 

13class Coords: 

14 def __init__(self, x, y): 

15 self.x = x 

16 self.y = y 

17 

18def calc_centre_simulation_objects(simulation_object_collection): 

19 from django.db.models import Min, Max, F, ExpressionWrapper, FloatField 

20 aggregated = GraphicObject.objects.filter( 

21 simulationObject__in=simulation_object_collection 

22 ).aggregate( 

23 min_x=Min('x'), 

24 max_edge_x=Max(ExpressionWrapper(F('x') + F('width'), output_field=FloatField())), 

25 min_y=Min('y'), 

26 max_edge_y=Max(ExpressionWrapper(F('y') + F('height'), output_field=FloatField())) 

27 ) 

28 centre_x = (aggregated['min_x'] + aggregated['max_edge_x']) / 2 

29 centre_y = (aggregated['min_y'] + aggregated['max_edge_y']) / 2 

30 return Coords(centre_x, centre_y) 

31 

32def simulation_object_relative_location(old_centre: Coords, old_simulation_object: SimulationObject): 

33 graphic_object = old_simulation_object.graphicObject.last() 

34 return Coords(graphic_object.x - old_centre.x, graphic_object.y - old_centre.y) 

35 

36 

37class SimulationObjectDuplicator: 

38 

39 def __init__(self, flowsheet_id): 

40 self.flowsheet_id = flowsheet_id 

41 

42 def create_duplicate_simulation_objects(self, original_simulation_objects): 

43 original_to_duplicate_map = {} 

44 new_simulation_objects = [] 

45 for original_simulation_object in original_simulation_objects: 

46 new_simulation_object = SimulationObject( 

47 flowsheet=original_simulation_object.flowsheet, 

48 objectType=original_simulation_object.objectType, 

49 componentName=original_simulation_object.componentName + ' copy' 

50 ) 

51 original_to_duplicate_map[original_simulation_object] = new_simulation_object 

52 new_simulation_objects.append(new_simulation_object) 

53 return original_to_duplicate_map, new_simulation_objects 

54 

55 def duplicate_graphics(self, old_centre, new_centre, original_simulation_objects, original_to_duplicate_map): 

56 all_graphics = [] 

57 property_set_map = {} 

58 all_property_sets = [] 

59 for original_simulation_object in original_simulation_objects: 

60 new_simulation_object = original_to_duplicate_map[original_simulation_object] 

61 relative_location = simulation_object_relative_location(old_centre, original_simulation_object) 

62 graphic_object = original_simulation_object.graphicObject.last() 

63 new_graphics_object = GraphicObject( 

64 simulationObject=new_simulation_object, 

65 width=graphic_object.width, 

66 height=graphic_object.height, 

67 x=new_centre.x + relative_location.x, 

68 y=new_centre.y + relative_location.y, 

69 group=graphic_object.group, 

70 flowsheet_id=self.flowsheet_id 

71 ) 

72 all_graphics.append(new_graphics_object) 

73 old_set = original_simulation_object.properties 

74 new_set = PropertySet(simulationObject=new_simulation_object, flowsheet_id=self.flowsheet_id) 

75 property_set_map[old_set.pk] = new_set 

76 all_property_sets.append(new_set) 

77 return all_graphics, property_set_map, all_property_sets 

78 

79 def duplicate_port_data(self, original_simulation_objects, original_to_duplicate_map): 

80 all_ports_to_create = [] 

81 all_ports_updates = [] 

82 all_original_ports = Port.objects.filter(unitOp__in=original_simulation_objects).select_related('unitOp') 

83 port_map = {} 

84 for port in all_original_ports: 

85 new_unitOp = original_to_duplicate_map[port.unitOp] 

86 new_port = Port( 

87 displayName=port.displayName, 

88 key=port.key, 

89 index=port.index, 

90 direction=port.direction, 

91 unitOp=new_unitOp, 

92 flowsheet_id=self.flowsheet_id 

93 ) 

94 port_map[port.pk] = new_port 

95 all_ports_to_create.append(new_port) 

96 return all_ports_to_create, all_ports_updates, port_map 

97 

98 def update_streams(self, original_simulation_objects, original_to_duplicate_map, port_map): 

99 connected_ports = list(chain.from_iterable( 

100 [list(sim_obj.connectedPorts.all()) for sim_obj in original_simulation_objects] 

101 )) 

102 all_ports_updates = [] 

103 for connected_port in connected_ports: 

104 if connected_port.unitOp in original_to_duplicate_map: 

105 new_unitOp = original_to_duplicate_map[connected_port.unitOp] 

106 new_port = port_map.get(connected_port.pk) 

107 if new_port: 107 ↛ 103line 107 didn't jump to line 103 because the condition on line 107 was always true

108 new_port.stream = original_to_duplicate_map.get(connected_port.stream, new_unitOp) 

109 all_ports_updates.append(new_port) 

110 return all_ports_updates 

111 

112 def duplicate_properties(self, property_set_map): 

113 all_property_sets = list(property_set_map.values()) 

114 PropertySet.objects.bulk_create(all_property_sets) 

115 

116 all_original_set_ids = list(property_set_map.keys()) 

117 original_property_infos = list(PropertyInfo.objects.filter(set_id__in=all_original_set_ids)) 

118 original_property_values = list( 

119 PropertyValue.objects 

120 .filter(property_id__in=[property_info.pk for property_info in original_property_infos]) 

121 .prefetch_related('indexedItems') 

122 ) 

123 

124 # creates a map of original property values to their associated indexed items 

125 original_indexed_item_map = {} 

126 for original_property_value in original_property_values: 

127 indexed_item_ids = [indexed_item.pk for indexed_item in original_property_value.indexedItems.all()] 

128 if original_property_value.pk not in original_indexed_item_map: 128 ↛ 130line 128 didn't jump to line 130 because the condition on line 128 was always true

129 original_indexed_item_map[original_property_value.pk] = [] 

130 original_indexed_item_map[original_property_value.pk] += indexed_item_ids 

131 

132 # creates and bulk inserts property infos 

133 property_info_map = {} 

134 all_property_infos = [] 

135 for original_property_info in original_property_infos: 

136 new_info = PropertyInfo( 

137 set=property_set_map[original_property_info.set_id], 

138 type=original_property_info.type, 

139 unitType=original_property_info.unitType, 

140 unit=original_property_info.unit, 

141 expression=original_property_info.expression, 

142 key=original_property_info.key, 

143 displayName=original_property_info.displayName, 

144 index=original_property_info.index, 

145 flowsheet_id=self.flowsheet_id 

146 ) 

147 property_info_map[original_property_info.pk] = new_info 

148 all_property_infos.append(new_info) 

149 PropertyInfo.objects.bulk_create(all_property_infos) 

150 

151 # creates and bulk inserts property values 

152 value_map = {} 

153 all_property_values = [] 

154 for original_property_value in original_property_values: 

155 new_val = PropertyValue( 

156 enabled=original_property_value.enabled, 

157 value=original_property_value.value, 

158 displayValue=original_property_value.displayValue, 

159 formula=original_property_value.formula, 

160 property=property_info_map[original_property_value.property_id], 

161 flowsheet_id=self.flowsheet_id 

162 ) 

163 value_map[original_property_value.pk] = new_val 

164 all_property_values.append(new_val) 

165 PropertyValue.objects.bulk_create(all_property_values) 

166 

167 # creates and bulk inserts indexed items 

168 all_indexed_items = [] 

169 # gather all unique indexed item IDs from the original_indexed_item_map 

170 original_indexed_ids = { 

171 pk for id_list in original_indexed_item_map.values() for pk in id_list 

172 } 

173 original_indexed_items = IndexedItem.objects.filter(pk__in=original_indexed_ids) 

174 indexed_item_map = {} 

175 for original_indexed_item in original_indexed_items: 

176 owner_simulation_object = list(value_map.values())[0].property.set.simulationObject 

177 new_indexed_item = IndexedItem( 

178 owner=owner_simulation_object, 

179 key=original_indexed_item.key, 

180 displayName=original_indexed_item.displayName, 

181 type=original_indexed_item.type, 

182 flowsheet_id=self.flowsheet_id 

183 ) 

184 indexed_item_map[original_indexed_item.pk] = new_indexed_item 

185 all_indexed_items.append(new_indexed_item) 

186 IndexedItem.objects.bulk_create(all_indexed_items) 

187 

188 # creates and bulk inserts intermediary records  

189 # linking duplicated property values to their associated indexed items 

190 all_intermediates = [] 

191 for original_property_value_pk, idx_list in original_indexed_item_map.items(): 

192 for idx_pk in idx_list: 

193 new_int = PropertyValueIntermediate( 

194 propertyvalue_id=value_map[original_property_value_pk].pk, 

195 indexeditem_id=indexed_item_map[idx_pk].pk 

196 ) 

197 all_intermediates.append(new_int) 

198 PropertyValueIntermediate.objects.bulk_create(all_intermediates) 

199 

200 def duplicate_packages(self, original_simulation_objects, original_to_duplicate_map): 

201 new_property_packages = [] 

202 for original_simulation_object in original_simulation_objects: 

203 new_simulation_object = original_to_duplicate_map[original_simulation_object] 

204 for original_package in original_simulation_object.propertyPackages.all(): 

205 new_property_packages.append( 

206 SimulationObjectPropertyPackages( 

207 simulationObject=new_simulation_object, 

208 name=original_package.name, 

209 propertyPackage=original_package.propertyPackage, 

210 flowsheet_id=self.flowsheet_id 

211 ) 

212 ) 

213 SimulationObjectPropertyPackages.objects.bulk_create(new_property_packages) 

214 

215 

216class DuplicateSimulationObject: 

217 def handle_duplication_request(self, flowsheet: int, validated_data): 

218 with transaction.atomic(): 

219 # get the original simulation objects 

220 original_simulation_objects = ( 

221 SimulationObject.objects 

222 .filter(pk__in=validated_data.get('objectIDs')) 

223 .select_related('flowsheet') 

224 .prefetch_related( 

225 'properties', 

226 'propertyPackages', 

227 'graphicObject', 

228 Prefetch('connectedPorts', queryset=Port.objects.select_related('unitOp', 'stream')) 

229 ) 

230 ) 

231 

232 # calculate the centre of the original simulation objects 

233 old_centre = calc_centre_simulation_objects(original_simulation_objects) 

234 # calculate the new centre of the duplicated simulation objects 

235 new_centre = Coords(validated_data.get('x'), validated_data.get('y')) 

236 

237 # create a new simulation object for each original simulation object 

238 duplicator = SimulationObjectDuplicator(flowsheet) 

239 original_to_duplicate_map, new_simulation_objects = duplicator.create_duplicate_simulation_objects(original_simulation_objects) 

240 SimulationObject.objects.bulk_create(new_simulation_objects) 

241 

242 # duplicate the graphics, properties, and packages 

243 all_graphics, property_set_map, all_property_sets = duplicator.duplicate_graphics( 

244 old_centre, new_centre, original_simulation_objects, original_to_duplicate_map 

245 ) 

246 

247 duplicator.duplicate_packages(original_simulation_objects, original_to_duplicate_map) 

248 

249 # duplicate the ports and update the streams 

250 all_ports_to_create, all_ports_updates, port_map = duplicator.duplicate_port_data( 

251 original_simulation_objects, original_to_duplicate_map 

252 ) 

253 Port.objects.bulk_create(all_ports_to_create) 

254 

255 ports_updates = duplicator.update_streams(original_simulation_objects, original_to_duplicate_map, port_map) 

256 Port.objects.bulk_update(ports_updates, ['stream']) 

257 

258 GraphicObject.objects.bulk_create(all_graphics) 

259 duplicator.duplicate_properties(property_set_map) 

260 

261 simulation_objects_to_update = list(original_to_duplicate_map.values()) 

262 SimulationObject.objects.bulk_update(simulation_objects_to_update, ['flowsheet','objectType'])