Coverage for backend/django/flowsheetInternals/unitops/models/simulation_object_factory.py: 94%

270 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-05-13 02:47 +0000

1import itertools 

2from typing import List, Tuple 

3 

4from core.auxiliary.models.Flowsheet import Flowsheet 

5from core.auxiliary.models.IndexedItem import IndexedItem 

6from core.auxiliary.models.PropertyValue import PropertyValue, PropertyValueIntermediate 

7from core.auxiliary.models.PropertyInfo import PropertyInfo 

8from core.auxiliary.models.RecycleData import RecycleData 

9from core.auxiliary.models.PropertySet import PropertySet 

10from core.auxiliary.enums import ConType, heat_exchange_ops 

11 

12from flowsheetInternals.unitops.models.SimulationObject import SimulationObject 

13from flowsheetInternals.unitops.models.Port import Port 

14from flowsheetInternals.graphicData.models.graphicObjectModel import GraphicObject 

15from flowsheetInternals.graphicData.models.groupingModel import Grouping 

16from flowsheetInternals.unitops.models.compound_propogation import update_compounds_on_add_stream 

17 

18from flowsheetInternals.unitops.config.config_methods import * 

19from flowsheetInternals.unitops.config.config_base import configuration 

20from common.config_types import * 

21from core.auxiliary.models.ObjectTypeCounter import ObjectTypeCounter 

22from core.auxiliary.views.ExtractSegmentDataFromFS import create_he_streams 

23 

24 

25class SimulationObjectFactory: 

26 def __init__(self) -> None: 

27 self._flowsheet: Flowsheet | None = None 

28 

29 self.simulation_objects: list[SimulationObject] = [] 

30 self.graphic_objects: list[GraphicObject] = [] 

31 self.property_sets: list[PropertySet] = [] 

32 self.property_infos: list[PropertyInfo] = [] 

33 self.property_values: list[PropertyValue] = [] 

34 self.ports: list[Port] = [] 

35 self.index_items: list[IndexedItem] = [] 

36 self.property_value_indexed_items: List[PropertyValueIntermediate] = [] 

37 

38 # conditionally created objects 

39 self.recycle_data: list[RecycleData] = [] 

40 

41 self._unitop: SimulationObject = None # the unitop of the object being created 

42 self._current_port: Port = None # the current port for the stream being created 

43 

44 self._idx_map: dict[int, SimulationObject] = {} # map of id to simulation object 

45 self._property_set_map: dict[int, list[PropertySet]] = {} # map of simulation object to property sets for the many-to-many relationship 

46 self._index_items_map: dict[int, dict[str, list[IndexedItem]]] = {} # map of simulation object to indexed items for the many-to-many relationship 

47 

48 # For storing old property data when replacing the gut 

49 self.old_properties: dict[str, any] = {} 

50 

51 @classmethod 

52 def create_simulation_object(cls, coordinates: dict[str, float] | None = None, createPropertySet: bool = True, flowsheet=None, parentGroup=None, **kwargs) -> SimulationObject: 

53 """ 

54 Creates a new unitop with attached ports, streams etc 

55 """ 

56 factory = SimulationObjectFactory() 

57 

58 # Create unitop 

59 # if parentGroup is provided, fetch it, else fallback to flowsheet.rootGrouping. 

60 if parentGroup: 

61 group = Grouping.objects.get(pk=parentGroup) 

62 else: 

63 if flowsheet: 63 ↛ 66line 63 didn't jump to line 66 because the condition on line 63 was always true

64 group = flowsheet.rootGrouping 

65 else: 

66 group = None # or raise an error if flowsheet is required 

67 factory._flowsheet = flowsheet 

68 object_type = kwargs.pop("objectType") 

69 if kwargs.get("schema") is not None: 

70 object_schema = kwargs.pop("schema") 

71 else: 

72 object_schema: ObjectType = configuration[object_type] 

73 

74 

75 if coordinates is None: 

76 coordinates = {"x": 0, "y": 0} 

77 

78 unitop = factory.create( 

79 object_type=object_type, 

80 object_schema=object_schema, 

81 coordinates=coordinates, 

82 flowsheet=flowsheet, 

83 createPropertySet=createPropertySet, 

84 parentGroup= group, 

85 ) 

86 

87 factory._unitop = unitop 

88 graphic_object = factory.graphic_objects[-1] 

89 

90 # count the number of inlet and outlet ports 

91 num_inlets = len([port for port in factory.ports if port.direction == ConType.Inlet]) 

92 

93 num_outlets = len([port for port in factory.ports 

94 if ((port.direction == ConType.Outlet) and 

95 (factory._unitop.schema.ports[port.key].makeStream==True))]) 

96 

97 

98 

99 

100 inlet_index = 0 

101 outlet_index = 0 

102 

103 if kwargs.get("create_attached_streams", True): 

104 for port in factory.ports: 

105 factory._current_port = port 

106 port_schema = factory._unitop.schema.ports[port.key] 

107 if port_schema.makeStream is False: 

108 continue 

109 stream_type = port_schema.streamType 

110 

111 stream_schema = configuration[stream_type] 

112 # evenly space the ports along the sides of the unitop  

113 if port.direction == ConType.Inlet: 

114 coordinates = port.default_stream_position(factory._unitop, graphic_object, inlet_index, num_inlets) 

115 inlet_index += 1 

116 else: 

117 coordinates = port.default_stream_position(factory._unitop, graphic_object, outlet_index, num_outlets) 

118 outlet_index += 1 

119 

120 stream_name = port.default_stream_name(factory._unitop) 

121 stream = factory.create( 

122 object_type=stream_type, 

123 object_schema=stream_schema, 

124 coordinates=coordinates, 

125 flowsheet=flowsheet, 

126 componentName=stream_name, 

127 parentGroup=group, 

128 ) 

129 

130 port.stream = stream 

131 

132 # save all created objects 

133 factory.perform_bulk_create() 

134 

135 if unitop.objectType in heat_exchange_ops: 

136 create_he_streams(unitop, group) 

137 

138 # Propagate streams to other groups for the newly created unitop 

139 inlet_streams = [] 

140 outlet_streams = [] 

141 for port in unitop.ports.all(): 

142 if port.stream: 

143 if port.direction == ConType.Inlet: 

144 inlet_streams.append(port.stream) 

145 else: 

146 outlet_streams.append(port.stream) 

147 from flowsheetInternals.graphicData.logic.make_group import propagate_streams 

148 propagate_streams(inlet_streams, ConType.Inlet) 

149 propagate_streams(outlet_streams, ConType.Outlet) 

150 

151 return unitop 

152 

153 

154 @classmethod 

155 def create_stream_at_port(cls, port: Port) -> SimulationObject: 

156 """ 

157 Creates a new stream attached to the specified port. 

158 """ 

159 factory = SimulationObjectFactory() 

160 factory._current_port = port 

161 factory._unitop = port.unitOp 

162 factory._flowsheet = factory._unitop.flowsheet 

163 

164 # Figure out what type of stream to create based on port config 

165 # E.g energy_stream, stream, etc 

166 object_type = factory._unitop.schema.ports[port.key].streamType 

167 object_schema: ObjectType = configuration[object_type] 

168 

169 # get the coordinates for the stream 

170 coordinates = cls.default_stream_position(factory._unitop, port) 

171 

172 #get the group of the unitop that the stream is attached to 

173 group = factory._unitop.graphicObject.last().group 

174 

175 # create the stream 

176 stream = factory.create( 

177 object_type=object_type, 

178 object_schema=object_schema, 

179 coordinates=coordinates, 

180 flowsheet=factory._unitop.flowsheet, 

181 parentGroup=group, 

182 ) 

183 factory.perform_bulk_create() 

184 

185 # attach the stream to the port 

186 port.stream = stream 

187 port.save() 

188 

189 #populate the stream with compounds 

190 update_compounds_on_add_stream(port, stream) 

191 

192 return stream 

193 

194 

195 @classmethod 

196 def default_stream_position(cls, unitop: SimulationObject, port: Port) -> dict[str, float]: 

197 """ 

198 Returns the default position for a stream attached to the specified port. 

199 """ 

200 graphic_object = unitop.graphicObject.last() # the unit op only has one graphic object so this is okay. 

201 attached_ports = unitop.ports.filter(direction=port.direction) 

202 port_index = list(attached_ports).index(port) 

203 return port.default_stream_position(unitop, graphic_object, port_index, attached_ports.count()) 

204 

205 

206 def perform_bulk_create(self) -> None: 

207 """ 

208 Saves all created objects to the database. 

209 """ 

210 SimulationObject.objects.bulk_create(self.simulation_objects) 

211 GraphicObject.objects.bulk_create(self.graphic_objects) 

212 PropertySet.objects.bulk_create(self.property_sets) 

213 PropertyInfo.objects.bulk_create(self.property_infos) 

214 PropertyValue.objects.bulk_create(self.property_values) 

215 Port.objects.bulk_create(self.ports) 

216 RecycleData.objects.bulk_create(self.recycle_data) 

217 IndexedItem.objects.bulk_create(self.index_items) 

218 PropertyValueIntermediate.objects.bulk_create(self.property_value_indexed_items) 

219 

220 def create( 

221 self, object_type: str, object_schema: ObjectType, coordinates: dict[str, float], flowsheet: Flowsheet | None = None, parentGroup: Grouping | None = None, componentName: str | None = None, createPropertySet: bool = True 

222 ) -> SimulationObject: 

223 """ 

224 Creates a new simulation object 

225 does not save the instance to the database. 

226  

227 @param object_type: The type of object to create 

228 @param object_schema: The schema of the object 

229 @param coordinates: The coordinates of the object 

230 @param flowsheet: The flowsheet owner of the object 

231  

232 @return: The created simulation object, graphic object, property sets, property infos, property packages, and ports. 

233 this is so that the caller can create multiple objects at once and do a bulk_create 

234 """ 

235 if flowsheet is not None: 235 ↛ 238line 235 didn't jump to line 238 because the condition on line 235 was always true

236 idx_for_type = ObjectTypeCounter.next_for(flowsheet, object_type) 

237 else: 

238 last_sim_obj = SimulationObject.objects.last() 

239 if last_sim_obj is None: 

240 id = 1 

241 else: 

242 last_sim_obj = SimulationObject.objects.last() 

243 idx_for_type = (last_sim_obj.id + 1) if last_sim_obj else 1 

244 

245 if componentName is None: 

246 componentName = f"{object_schema.displayType}{idx_for_type}" 

247 

248 elif "stream" not in object_type and componentName != object_schema.displayName: 

249 componentName = componentName 

250 else: 

251 componentName = f"{componentName}{idx_for_type}" 

252 

253 

254 # create simulation object 

255 fields = { 

256 "objectType": object_type, 

257 "componentName": componentName, 

258 } 

259 instance = SimulationObject(**fields) 

260 self.simulation_objects.append(instance) 

261 idx = len(self._idx_map) 

262 self._idx_map[idx] = instance 

263 self._property_set_map[idx] = [] 

264 

265 # create index items 

266 self._index_items_map[idx] = {} 

267 for index_set in object_schema.indexSets: 

268 self.create_indexed_items(instance, index_set, idx) 

269 

270 graphic_object_schema = object_schema.graphicObject 

271 

272 graphicObject = GraphicObject( 

273 simulationObject=instance, 

274 x=coordinates["x"] - graphic_object_schema.width / 2, # center the object horizontally 

275 y=coordinates["y"] - graphic_object_schema.height / 2, # center the object vertically 

276 width = graphic_object_schema.width, 

277 height = graphic_object_schema.height, 

278 visible=True, 

279 group=parentGroup, 

280 flowsheet=flowsheet 

281 ) 

282 self.graphic_objects.append(graphicObject) 

283 

284 # If flowsheetKey is given, set flowsheet 

285 if flowsheet is not None: 285 ↛ 290line 285 didn't jump to line 290 because the condition on line 285 was always true

286 instance.flowsheet = flowsheet 

287 

288 

289 # Create property sets 

290 if createPropertySet: 290 ↛ 294line 290 didn't jump to line 294 because the condition on line 290 was always true

291 self.create_property_set(object_schema, idx) 

292 

293 # Create ports 

294 ports_schema = object_schema.ports or {} 

295 # replace any many=True ports with multiple ports 

296 for key, port_dict in ports_schema.items(): 

297 if port_dict.many: 

298 # replace this key value pair with a list of ports up to the default provided 

299 for i in range(port_dict.default): 

300 self.ports.append(Port( 

301 key=key, 

302 index=i, 

303 direction=port_dict.type, 

304 displayName=port_dict.displayName + f" {i+1}", 

305 unitOp=instance, 

306 flowsheet=flowsheet 

307 )) 

308 else: 

309 self.ports.append(Port( 

310 key=key, 

311 direction=port_dict.type, 

312 displayName=port_dict.displayName, 

313 unitOp=instance, 

314 flowsheet=flowsheet 

315 )) 

316 

317 # conditionally create attached objects based on object type 

318 if object_type == "recycle": 

319 self.recycle_data.append(RecycleData( 

320 simulationObject=instance, 

321 flowsheet=flowsheet 

322 )) 

323 

324 return instance 

325 

326 def store_old_properties(self, instance: SimulationObject): 

327 """ 

328 Stores the old properties of the stream. 

329 Used when switching stream types to keep properties that exist in both schema. 

330 Prevents losing user input in those properties. 

331 e.g. If Molar Flow has a value, switching to Humid Air won't remove that value. 

332 """ 

333 self.old_properties = {} 

334 if instance.properties: 334 ↛ exitline 334 didn't return from function 'store_old_properties' because the condition on line 334 was always true

335 for prop_info in instance.properties.containedProperties.all(): 

336 try: 

337 self.old_properties[prop_info.key] = prop_info.get_value_bulk() 

338 except ValueError: 

339 self.old_properties[prop_info.key] = None 

340 

341 def replace_the_gut(self, instance: SimulationObject): 

342 self.simulation_objects.append(instance) 

343 self._unitop = instance 

344 self._current_port = instance.connectedPorts.first() 

345 

346 object_schema = configuration[instance.objectType] 

347 idx = len(self._idx_map) 

348 self._idx_map[idx] = instance 

349 self._property_set_map[idx] = [] 

350 

351 # create index items 

352 self._index_items_map[idx] = {} 

353 for index_set in object_schema.indexSets: 

354 self.create_indexed_items(instance, index_set, idx) 

355 

356 self.create_property_set(object_schema, idx) 

357 

358 

359 def create_property_set(self, object_schema: ObjectType, idx: int) -> PropertySet: 

360 """ 

361 Creates a new SimulationObjectPropertySet instance with the specified schema. 

362 """ 

363 defaults = { 

364 'compoundMode': "", 

365 "simulationObject": self._idx_map[idx], 

366 } 

367 for schema in object_schema.propertySetGroups.values(): 

368 if schema.type == "composition": 

369 defaults['compoundMode'] = "MassFraction" 

370 

371 # Create SimulationObjectPropertySet 

372 instance = PropertySet(**defaults, flowsheet=self._flowsheet) 

373 

374 if object_schema.properties != {}: 374 ↛ 385line 374 didn't jump to line 385 because the condition on line 374 was always true

375 # Create PropertyInfo objects 

376 property_pairs = self.create_property_infos(instance, object_schema.properties, idx) 

377 

378 # set access based on the schema 

379 self.set_properties_access( 

380 config=object_schema, 

381 properties=property_pairs, 

382 idx=idx, 

383 ) 

384 

385 self.property_sets.append(instance) 

386 self._property_set_map[idx].append(instance) 

387 

388 return instance 

389 

390 

391 def create_property_infos(self, property_set: PropertySet, schema: PropertiesType, idx) -> List[Tuple[PropertyInfo, List[PropertyValue]]]: 

392 """ 

393 Creates PropertyInfo objects based on the specified schema. 

394 """ 

395 res: List[Tuple[PropertyInfo, List[PropertyValue]]] = [] 

396 

397 # if schema.type == "composition": 

398 # # the composition property set has no property infos, since these are compounds selected by the user 

399 # # TODO: Initialise the properties based on the compounds upstream (if any) 

400 # flowsheet = self._flowsheet 

401 for key, prop in schema.items(): 

402 fields = get_property_fields(key, prop, property_set) 

403 

404 if self.old_properties: 

405 # If property in the schema is also in the old properties, keep the value 

406 if key in self.old_properties.keys(): 

407 fields["value"] = self.old_properties[key] 

408 

409 new_property_info, new_property_values = self.create_property_info(idx, prop.indexSets, **fields) 

410 res.append((new_property_info, new_property_values)) 

411 

412 return res 

413 

414 

415 def create_property_info(self, idx, index_sets=None, **fields): 

416 value = fields.pop("value") 

417 property_info = PropertyInfo(**fields, flowsheet=self._flowsheet) 

418 self.property_infos.append(property_info) 

419 # Create a property value object with this value 

420 if index_sets == None: 

421 property_value = PropertyValue(value=value, property=property_info, flowsheet=self._flowsheet) 

422 self.property_values.append(property_value) 

423 return property_info, [property_value] 

424 else: 

425 combinations = self.get_combinations(self._index_items_map[idx], index_sets) 

426 property_values = [] 

427 for indexes in combinations: 

428 property_value = PropertyValue(value=value, property=property_info, flowsheet=self._flowsheet) 

429 for indexed_item in indexes: 

430 self.property_value_indexed_items.append( 

431 PropertyValueIntermediate(propertyvalue=property_value, indexeditem=indexed_item) 

432 ) 

433 property_values.append(property_value) 

434 self.property_values.extend(property_values) 

435 return property_info, property_values 

436 

437 

438 def get_combinations(self, index_items_map: dict[str, list[IndexedItem]], index_sets=[]) -> list[list[IndexedItem]]: 

439 """ 

440 Returns all possible combinations of indexed items for the specified index sets 

441 (taking one item from each index set). 

442 """ 

443 if index_items_map == {}: 443 ↛ 444line 443 didn't jump to line 444 because the condition on line 443 was never true

444 return [] 

445 indexes = [value for key, value in index_items_map.items() if key in index_sets] 

446 return list(itertools.product(*indexes)) 

447 

448 

449 def set_properties_access( 

450 self, 

451 config: ObjectType, 

452 properties: List[Tuple[PropertyInfo, List[PropertyValue]]], 

453 idx: int, 

454 ) -> None: 

455 if ( 

456 self.simulation_objects[-1].is_stream() 

457 and self._unitop.objectType != "recycle" 

458 and self._current_port and self._current_port.direction == ConType.Outlet 

459 ): 

460 # disable outlet/intermediate stream properties 

461 for propertyInfo, propVals in properties: 

462 for property_value in propVals: 

463 property_value.enabled = False 

464 else: 

465 index_map_items = self._index_items_map[idx] 

466 # TODO: Refactor this. 

467 # not disabled if it is a state variable 

468 for prop, propVals in properties: 

469 config_prop = config.properties.get(prop.key) 

470 # # figure out the number of items in the first index set 

471 if config_prop.indexSets: 

472 first_index_type = config_prop.indexSets[0] 

473 if first_index_type in index_map_items: 473 ↛ 480line 473 didn't jump to line 480 because the condition on line 473 was always true

474 if len(index_map_items[first_index_type]) == 0: 

475 last_item = None 

476 else: 

477 last_item = index_map_items[first_index_type][-1] 

478 # last_item is only needed on index sets. 

479 # it should be fine if it's undefined otherwise. 

480 for index, property_value in enumerate(propVals): 

481 if config_prop: 481 ↛ 484line 481 didn't jump to line 484 because the condition on line 481 was always true

482 group = config_prop.propertySetGroup 

483 else: 

484 group = "default" 

485 

486 config_group = config.propertySetGroups.get(group, None) 

487 if config_group is None: 487 ↛ 488line 487 didn't jump to line 488 because the condition on line 487 was never true

488 property_value.enabled = True 

489 elif config_group.type == "exceptLast" or config_prop.sumToOne: 

490 # TODO: Deprecate exceptLast in favor of sumToOne. See phase_seperator_config 

491 # Instead of using len_last_item, just use the last item in the index set 

492 indexed_item_links = [item for item in self.property_value_indexed_items 

493 if item.propertyvalue == property_value] 

494 indexed_items = [item.indexeditem for item in indexed_item_links] 

495 is_last_item = last_item in indexed_items 

496 

497 if is_last_item: 

498 property_value.enabled = False 

499 else: 

500 property_value.enabled = True 

501 elif config_group.type == "stateVars": 501 ↛ 505line 501 didn't jump to line 505 because the condition on line 501 was always true

502 state_vars = getattr(config_group, "stateVars") or () 

503 property_value.enabled = prop.key in state_vars 

504 else: # eg. All 

505 property_value.enabled = True 

506 

507 

508 def create_indexed_items(self, instance: SimulationObject, index_set: str, idx: int) -> None: 

509 """ 

510 Creates IndexedItem instances for the specified index set. 

511 """ 

512 items = [] 

513 

514 outlet_name = instance.schema.splitter_fraction_name 

515 match index_set: 

516 case "splitter_fraction": 

517 items = [] 

518 # create indexed items for the splitter_fraction index set 

519 items.append(IndexedItem( 

520 owner=instance, 

521 key="outlet_1", 

522 displayName= outlet_name + " 1", 

523 type=index_set, 

524 flowsheet=self._flowsheet 

525 )) 

526 items.append(IndexedItem( 

527 owner=instance, 

528 key="outlet_2", 

529 displayName= outlet_name + " 2", 

530 type=index_set, 

531 flowsheet=self._flowsheet 

532 )) 

533 

534 case "phase": 

535 # create indexed items for the phase index set 

536 items.append(IndexedItem( 

537 owner=instance, 

538 key="Liq", 

539 displayName="Liquid", 

540 type=index_set, 

541 flowsheet=self._flowsheet 

542 )) 

543 items.append(IndexedItem( 

544 owner=instance, 

545 key="Vap", 

546 displayName="Vapor", 

547 type=index_set, 

548 flowsheet=self._flowsheet 

549 )) 

550 

551 case "compound": 

552 pass # have to wait for user to select compounds 

553 self._index_items_map[idx][index_set] = items 

554 self.index_items.extend(items) 

555 

556 

557