Coverage for backend/flowsheetInternals/unitops/models/SimulationObject.py: 83%

465 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-11-06 23:27 +0000

1from django.db import models 

2from core.managers import SoftDeleteManager 

3from core.auxiliary.enums.unitOpData import SimulationObjectClass 

4from core.auxiliary.models.Flowsheet import Flowsheet 

5from core.auxiliary.models.PropertySet import PropertySet 

6from core.auxiliary.models.PropertyInfo import PropertyInfo 

7from core.auxiliary.models.PropertyValue import PropertyValue, PropertyValueIntermediate 

8from core.auxiliary.models.IndexedItem import IndexedItem, IndexChoices 

9from typing import Set 

10 

11 

12from flowsheetInternals.propertyPackages.models.SimulationObjectPropertyPackages import SimulationObjectPropertyPackages 

13from flowsheetInternals.unitops.models.Port import Port 

14from typing import Iterable 

15from flowsheetInternals.unitops.config.config_methods import * 

16from common.config_types import * 

17import itertools 

18 

19from .compound_propogation import update_compounds_on_set, update_compounds_on_merge, _get_compound_keys, update_decision_node_and_propagate, update_compounds_on_add_stream 

20from typing import Optional, List 

21from ..methods.add_expression import add_expression as _add_expression 

22 

23 

24class SimulationObject(models.Model): 

25 flowsheet = models.ForeignKey(Flowsheet, on_delete=models.CASCADE, related_name="flowsheetObjects") 

26 componentName = models.CharField(max_length=64) 

27 objectType = models.CharField(choices= SimulationObjectClass.choices) 

28 

29 created_at = models.DateTimeField(auto_now_add=True) 

30 is_deleted = models.BooleanField(default=False) 

31 initial_values = models.JSONField(null=True, blank=True) 

32 

33 #add a soft delete manager 

34 objects = SoftDeleteManager() 

35 add_expression = _add_expression 

36 

37 @property 

38 def schema(self) -> ObjectType: 

39 return get_object_schema(self) 

40 

41 @property 

42 def has_recycle_connection(self) -> bool: 

43 return hasattr(self, "recycleConnection") 

44 

45 def is_stream(self) -> bool: 

46 return self.schema.is_stream 

47 

48 def get_stream(self, key: str, index: int = 0) -> "SimulationObject": 

49 """ 

50 Returns the stream attached to the port with the given key 

51 """ 

52 port = self.get_port(key, index) 

53 stream = port.stream 

54 return stream 

55 

56 def get_group(self) -> "Grouping": 

57 """ 

58 Returns the group that this object belongs to 

59 """ 

60 if not self.is_stream(): 60 ↛ 65line 60 didn't jump to line 65 because the condition on line 60 was always true

61 # There is only one graphic object, so we can just return the group 

62 return self.graphicObject.last().group 

63 else: 

64 # Streams arent' really in a group, throw an error 

65 raise ValueError("Streams do not belong to a group") 

66 

67 def get_groups(self) -> Iterable["Grouping"]: 

68 """ 

69 Returns an iterable of groups that this object belongs to. Mostly needed for streams that have many groups. 

70 """ 

71 return (graphic.group for graphic in self.graphicObject.all()) 

72 

73 def get_parent_groups(self) -> List["Grouping"]: 

74 """ 

75 Returns an iterable of parent groups that this object belongs to. 

76 """ 

77 parent_groups = [] 

78 current_group = self.get_group() 

79 while current_group: 

80 parent_groups.append(current_group) 

81 simulationObject = current_group.simulationObject 

82 if simulationObject is None: 82 ↛ 83line 82 didn't jump to line 83 because the condition on line 82 was never true

83 break 

84 current_group = simulationObject.get_group() 

85 return parent_groups 

86 

87 def get_property_package(self, name: str | None = None) -> SimulationObjectPropertyPackages: 

88 """ 

89 Returns the property package slot with the given name 

90 """ 

91 property_package_slot: SimulationObjectPropertyPackages 

92 if name is None: 

93 property_package_slot = self.propertyPackages.first() 

94 else: 

95 property_package_slot = self.propertyPackages.get(name=name) 

96 return property_package_slot 

97 

98 

99 def set_property_package(self, property_package, name: str | None = None) -> None: 

100 """ 

101 Sets the property package for this object 

102 """ 

103 property_package_slot = self.get_property_package(name) 

104 property_package_slot.propertyPackage = property_package 

105 property_package_slot.save() 

106 

107 

108 def get_port(self, key: str, index: int = 0) -> Port: 

109 """ 

110 Returns the port with the given key 

111 """ 

112 try: 

113 

114 port: Port = self.ports.get(key=key, index=index) 

115 return port 

116 except Port.DoesNotExist: 

117 raise ValueError(f"Port with key {key} does not exist on object {self.componentName}") 

118 

119 

120 def reorder_object_ports(self): 

121 """ 

122 Reorders port mappings by connected unit operation y position 

123 :return: None 

124 """ 

125 inlet_connections = [port for port in self.ports.filter(direction=ConType.Inlet).all()] 

126 outlet_connections = [port for port in self.ports.filter(direction=ConType.Outlet).all()] 

127 inlet_connections.sort(key=lambda port: port.stream.connectedPorts.get(direction=ConType.Outlet).unitOp.graphicObject.last().y) 

128 outlet_connections.sort(key=lambda port: port.stream.connectedPorts.get(direction=ConType.Inlet).unitOp.graphicObject.last().y) 

129 for i, port in enumerate(inlet_connections): 

130 port.index = i 

131 port.save() 

132 for i, port in enumerate(outlet_connections): 

133 port.index = i 

134 port.save() 

135 self.save() 

136 

137 

138 def horizontally_center_graphic(self) -> None: 

139 """ 

140 Horizontally centers GraphicObject for a stream or unit operation 

141 Currently uni-directional - left to right connections 

142 Centers based on inlet and outlet graphics. 

143 :return: None 

144 """ 

145 # TODO: Make this work as appropriate for streams with multiple graphic objects 

146 # Is an Intermediate Stream 

147 if self.objectType == SimulationObjectClass.Stream or SimulationObjectClass.EnergyStream or SimulationObjectClass.acStream and self.connectedPorts.count() > 1: 147 ↛ 158line 147 didn't jump to line 158 because the condition on line 147 was always true

148 inlet, outlet = self.connectedPorts.filter(direction=ConType.Outlet).first(), self.connectedPorts.filter(direction=ConType.Inlet).first() 

149 inlet_x, outlet_x = inlet.unitOp.graphicObject.last().x, outlet.unitOp.graphicObject.last().x 

150 if inlet_x <= outlet_x: 

151 self.graphicObject.last().x = abs((inlet_x + (inlet.unitOp.graphicObject.last().width)) + (outlet_x)) / 2 

152 else: 

153 self.graphicObject.last().x = abs(inlet_x + (outlet_x + outlet.unitOp.graphicObject.last().width)) / 2 

154 # Flip Graphic Object horizontally 

155 self.graphicObject.last().save() 

156 self.save() 

157 else: 

158 inlet, outlet = self.ports.filter(direction=ConType.Inlet).first(), self.ports.filter(direction=ConType.Outlet).first() 

159 self.graphicObject.last().x = abs((inlet.stream.graphicObject.last().x + outlet.stream.graphicObject.last().x) / 2) 

160 self.graphicObject.last().save() 

161 self.save() 

162 return 

163 

164 

165 def vertically_center_graphic(self) -> None: 

166 """ 

167 Vertically centers GraphicObject for a stream or unit operation 

168 :return: None 

169 """ 

170 # TODO: Make this work as appropriate for streams with multiple graphic objects 

171 # Is an intermediate stream 

172 if self.objectType == SimulationObjectClass.Stream or SimulationObjectClass.EnergyStream or SimulationObjectClass.acStream and self.connectedPorts.count() > 1: 

173 inlet, outlet = self.connectedPorts.filter(direction=ConType.Inlet).first(), self.connectedPorts.filter(direction=ConType.Outlet).first() 

174 self.graphicObject.last().y = abs((inlet.unitOp.graphicObject.last().y + outlet.unitOp.graphicObject.last().y) / 2) 

175 self.graphicObject.last().save() 

176 self.save() 

177 else: 

178 inlet, outlet = self.ports.filter(direction=ConType.Inlet).first(), self.ports.filter(direction=ConType.Outlet).all()[1] 

179 self.graphicObject.last().y = abs((inlet.stream.graphicObject.last().y + outlet.stream.graphicObject.last().y) / 2) 

180 self.graphicObject.last().save() 

181 self.save() 

182 return 

183 

184 

185 def split_stream(self) -> "SimulationObject": 

186 """ 

187 Splits a stream into two separate streams (one inlet and one outlet - disconnected). 

188 :return: New Stream Object (outlet stream) if object is a stream. 

189 """ 

190 from flowsheetInternals.unitops.models.simulation_object_factory import SimulationObjectFactory 

191 from flowsheetInternals.graphicData.models.graphicObjectModel import GraphicObject 

192 

193 new_stream = None 

194 if self.objectType == SimulationObjectClass.Stream or SimulationObjectClass.EnergyStream or SimulationObjectClass.acStream: 194 ↛ 265line 194 didn't jump to line 265 because the condition on line 194 was always true

195 connectedPorts = self.connectedPorts.all() 

196 inlet_port : Port = connectedPorts.get(direction=ConType.Inlet) 

197 outlet_port : Port = connectedPorts.get(direction=ConType.Outlet) 

198 new_stream = SimulationObjectFactory.create_stream_at_port(inlet_port) 

199 new_stream.save() 

200 # reset the stream to default position 

201 coordinates = SimulationObjectFactory.default_stream_position(outlet_port.unitOp, outlet_port) 

202 stream_graphic_object = self.graphicObject.last() 

203 stream_graphic_object.x = coordinates['x'] - stream_graphic_object.width / 2 

204 stream_graphic_object.y = coordinates['y'] - stream_graphic_object.height / 2 

205 stream_graphic_object.save() 

206 

207 

208 """ 

209 If a stream connects two groups together, it will have graphic objects in each of those groups. 

210 This next section figures out which graphic object to keep, and which to move to the new stream. 

211 """ 

212 all_graphic_objects = self.graphicObject.all() 

213 default_graphic_object = new_stream.graphicObject.last() # a default graphic object is created by create_stream_at_port 

214 # The graphic objects in these groups should be kept 

215 groups_to_keep : List[int] = [g.id for g in outlet_port.unitOp.get_parent_groups()] 

216 groups_to_move : List[int] = [g.id for g in inlet_port.unitOp.get_parent_groups()] 

217 

218 

219 for gobj in all_graphic_objects: 

220 if gobj.group.id in groups_to_keep: 

221 # keep it here 

222 if gobj.group.id in groups_to_move: 

223 # move the default graphic object into this group, so both the inlet and outlet show in this group 

224 default_graphic_object.group = gobj.group 

225 default_graphic_object.save() 

226 else: 

227 if gobj.group.id not in groups_to_move: 227 ↛ 228line 227 didn't jump to line 228 because the condition on line 227 was never true

228 print("Error: this should be in either ggroups to move or the other", gobj.group) 

229 # must be in groups_to_move 

230 # move graphic object to this stream 

231 gobj.simulationObject = new_stream 

232 gobj.save() 

233 

234 # make sure both the original and new streams are shown in all relevant groups 

235 for graphic_object in self.graphicObject.all(): 

236 current_group = graphic_object.group 

237 while current_group: 

238 parent_group = current_group.get_parent_group() 

239 if parent_group: 

240 # make sure the original stream is shown in the parent group 

241 if not self.graphicObject.filter(group=parent_group).exists(): 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true

242 GraphicObject.objects.create( 

243 flowsheet=self.flowsheet, 

244 simulationObject=self, 

245 width=graphic_object.width, 

246 height=graphic_object.height, 

247 x=graphic_object.x, 

248 y=graphic_object.y, 

249 group=parent_group, 

250 ) 

251 

252 # make sure the new stream is shown in the parent group 

253 if not new_stream.graphicObject.filter(group=parent_group).exists(): 253 ↛ 254line 253 didn't jump to line 254 because the condition on line 253 was never true

254 GraphicObject.objects.create( 

255 flowsheet=self.flowsheet, 

256 simulationObject=new_stream, 

257 width=graphic_object.width, 

258 height=graphic_object.height, 

259 x=graphic_object.x, 

260 y=graphic_object.y, 

261 group=parent_group 

262 ) 

263 current_group = parent_group 

264 

265 return new_stream 

266 

267 def merge_parallel_streams(self, connected_stream: "SimulationObject", decision_node=None, coordinates=None) -> "SimulationObject": 

268 """ 

269 Merges two streams that have the same direction (Both Inlet or Outlet) 

270 :param connected_stream: Stream to connect 

271 :param decision_node: Decision Node object to merges streams into (In the case of n-inlet / n-outlet) 

272 :return: Decision Node Object 

273 """ 

274 from flowsheetInternals.unitops.models.simulation_object_factory import SimulationObjectFactory 

275 is_inlet = self.connectedPorts.first().direction == ConType.Inlet 

276 direction = ConType.Outlet if is_inlet else ConType.Inlet 

277 if decision_node is None: 

278 modified_schema: ObjectType = configuration["decisionNode"].model_copy(deep=True) 

279 if(is_inlet): 279 ↛ 283line 279 didn't jump to line 283 because the condition on line 279 was always true

280 modified_schema.ports["outlet"].default = 2 

281 modified_schema.ports["inlet"].default = 2 

282 else: 

283 modified_schema.ports["inlet"].default = 2 

284 modified_schema.ports["outlet"].default = 2 

285 decision_node = SimulationObjectFactory.create_simulation_object( 

286 coordinates={'x': self.graphicObject.last().x, 'y': self.graphicObject.last().y} if coordinates is None else coordinates, 

287 objectType="decisionNode", 

288 schema=modified_schema, 

289 flowsheet=self.flowsheet, 

290 create_attached_streams=False, 

291 ) 

292 decision_node.graphicObject.last().rotation = self.connectedPorts.first().unitOp.graphicObject.last().rotation 

293 decision_node.graphicObject.last().save() 

294 ports = decision_node.ports.filter(direction=direction).all() 

295 port1 = ports[0] 

296 port2 = ports[1] 

297 port1.stream = self 

298 port2.stream = connected_stream 

299 port1.save() 

300 port2.save() 

301 

302 decision_node.save() 

303 

304 # Center GraphicObjects 

305 self.horizontally_center_graphic() 

306 connected_stream.horizontally_center_graphic() 

307 self.save() 

308 connected_stream.save() 

309 

310 return decision_node 

311 

312 

313 def merge_stream(self, connectStream: "SimulationObject") -> "SimulationObject": 

314 """ 

315 Connects this object (a stream) to another stream 

316 :param connectedStream: Stream to connect to this object 

317 :return: Decision Node Object 

318 """ 

319 material_stream_1 = self 

320 material_stream_2 = connectStream 

321 

322 material_stream_1_port = material_stream_1.connectedPorts.first() 

323 material_stream_2_port = material_stream_2.connectedPorts.first() 

324 

325 # Case 1: Intermediate to Intermediate 

326 if material_stream_1.connectedPorts.count() > 1 and material_stream_2.connectedPorts.count() > 1: 

327 coordinates = {'x': material_stream_2.graphicObject.last().x, 'y': material_stream_2.graphicObject.last().y} 

328 outlet_stream_1 = material_stream_1.split_stream() 

329 outlet_stream_2 = material_stream_2.split_stream() 

330 

331 decision_node = outlet_stream_2.merge_parallel_streams(outlet_stream_1, coordinates=coordinates) 

332 decision_node = material_stream_2.merge_parallel_streams(material_stream_1, decision_node) 

333 

334 update_decision_node_and_propagate(decision_node) 

335 decision_node.save() 

336 return decision_node 

337 

338 # Case 2: Connecting intermediate stream with product or feed 

339 if material_stream_1.connectedPorts.count() > 1 or material_stream_2.connectedPorts.count() > 1: 

340 inter_stream, connected_stream = ( 

341 (material_stream_1, material_stream_2) if material_stream_1.connectedPorts.count() > material_stream_2.connectedPorts.count() 

342 else (material_stream_2, material_stream_1) 

343 ) 

344 connected_stream_port = connected_stream.connectedPorts.first() 

345 is_inlet = (connected_stream_port.direction == ConType.Inlet) 

346 

347 if is_inlet: 

348 # Feed -> Intermediate 

349 decision_node = inter_stream.make_decision_node(num_inlets=1, num_outlets=2) 

350 empty_port = decision_node.ports.filter(direction=ConType.Outlet, stream=None).first() 

351 empty_port.stream = connected_stream 

352 empty_port.save() 

353 

354 # Center graphics 

355 self.horizontally_center_graphic() 

356 connected_stream.horizontally_center_graphic() 

357 self.save() 

358 connected_stream.save() 

359 

360 # Update compounds - First update decision node, then propagate 

361 update_decision_node_and_propagate(decision_node) 

362 return decision_node 

363 else: 

364 # Product -> Intermediate 

365 decision_node = inter_stream.make_decision_node(num_inlets=2, num_outlets=1) 

366 empty_port = decision_node.ports.filter(direction=ConType.Inlet, stream=None).first() 

367 empty_port.stream = connected_stream 

368 empty_port.save() 

369 

370 # Center graphics 

371 self.horizontally_center_graphic() 

372 connected_stream.horizontally_center_graphic() 

373 self.save() 

374 connected_stream.save() 

375 

376 # Update compounds - First update decision node, then propagate 

377 update_decision_node_and_propagate(decision_node) 

378 return decision_node 

379 

380 # Case 3: Feed to feed or product to product 

381 if material_stream_1_port.direction == material_stream_2_port.direction: 381 ↛ 382line 381 didn't jump to line 382 because the condition on line 381 was never true

382 result = material_stream_2.merge_parallel_streams(material_stream_1) 

383 update_decision_node_and_propagate(result) 

384 return result 

385 

386 # Case 4: Feed to product or product to feed 

387 if material_stream_1_port.direction == "inlet": 

388 inlet_stream, outlet_stream = material_stream_1, material_stream_2 

389 inlet_port, outlet_port = material_stream_1_port, material_stream_2_port 

390 else: 

391 inlet_stream, outlet_stream = material_stream_2, material_stream_1 

392 inlet_port, outlet_port = material_stream_2_port, material_stream_1_port 

393 

394 # Get all the graphic objects to preserve 

395 

396 # Always preserve the graphic of the stream that is being connected to 

397 preserve_graphic = material_stream_2.graphicObject.last() 

398 # Update compounds 

399 if inlet_stream.objectType == SimulationObjectClass.EnergyStream: 399 ↛ 400line 399 didn't jump to line 400 because the condition on line 399 was never true

400 pass 

401 elif inlet_stream.objectType != SimulationObjectClass.acStream: 401 ↛ 404line 401 didn't jump to line 404 because the condition on line 401 was always true

402 update_compounds_on_merge(inlet_stream, outlet_stream) 

403 

404 def update_graphic_object_on_merge(preserve_stream,delete_stream) -> None: 

405 # Preserve one graphic object for each group either stream is connected to 

406 # If the stream is connected to multiple groups, it will be shown in both groups 

407 preserve_stream_groups = list(preserve_stream.get_groups()) 

408 preserve_stream_gobjs = preserve_stream.graphicObject.all() 

409 merged_gobjs = [] 

410 for gobj in delete_stream.graphicObject.all(): 

411 # Check if there's a preserved stream in this group 

412 preserved_gobj = next((g for g in preserve_stream_gobjs if g.group == gobj.group), None) 

413 if preserved_gobj is not None: 

414 # Keep this if  

415 if preserve_stream == material_stream_2: 

416 # Keep this graphic object's position 

417 gobj.delete() 

418 else: 

419 # Keep the position of the stream to be deleted 

420 preserved_gobj.copy_position_from(gobj) 

421 gobj.delete() 

422 # If both streams are shown in multiple parent groups, we only want to show the  

423 # stream at the lowest level. make a list of these merged graphicsObjects so we can remove 

424 # the extras later. 

425 merged_gobjs.append(preserved_gobj) 

426 else: 

427 # connect this graphic object to the preserved stream 

428 gobj.simulationObject = preserve_stream 

429 gobj.save() 

430 

431 # Of the merged graphics objects, only keep the one in the lowest group 

432 parent_groups: list[int] = [] 

433 

434 for gobj in merged_gobjs: 

435 parent_group = gobj.group.get_parent_group() 

436 if parent_group is not None: 

437 parent_groups.append(parent_group.pk) 

438 

439 for gobj in merged_gobjs: 

440 if gobj.group.pk in parent_groups: 

441 pass 

442 gobj.delete() 

443 

444 from flowsheetInternals.unitops.models.delete_factory import DeleteFactory 

445 if not outlet_stream.has_recycle_connection and inlet_stream.has_path_to(outlet_stream): 

446 # preserve the outlet stream 

447 preserve_stream = outlet_stream 

448 inlet_port.stream = preserve_stream 

449 inlet_port.save() 

450 DeleteFactory.delete_object(inlet_stream) 

451 

452 update_graphic_object_on_merge(preserve_stream, inlet_stream) 

453 

454 # attach recycle block to the inlet stream 

455 # this also handles updating property access 

456 outlet_stream.attach_recycle() 

457 inlet_stream.delete_control_values() 

458 

459 outlet_stream.reevaluate_properties_enabled() 

460 else: 

461 # Preserve the outlet stream 

462 preserve_stream = outlet_stream 

463 inlet_port.stream = preserve_stream 

464 inlet_port.save() 

465 DeleteFactory.delete_object(inlet_stream) 

466 # Update graphic object 

467 update_graphic_object_on_merge(preserve_stream, inlet_stream) 

468 

469 return preserve_stream 

470 

471 def delete_control_values(self) -> None: 

472 """ 

473 Deletes all control values connected to properties in this object 

474 """ 

475 property_set: PropertySet = self.properties 

476 for prop in property_set.ContainedProperties.all(): 

477 for value in prop.values.all(): 

478 if value.is_control_set_point(): 478 ↛ 479line 478 didn't jump to line 479 because the condition on line 478 was never true

479 value.controlSetPoint.delete() 

480 

481 

482 def attach_recycle(self) -> None: 

483 """ 

484 Attaches a new recycle block to the stream 

485 """ 

486 if self.has_recycle_connection: 486 ↛ 487line 486 didn't jump to line 487 because the condition on line 486 was never true

487 return # already has a recycle connection 

488 from flowsheetInternals.unitops.models.simulation_object_factory import SimulationObjectFactory 

489 recycle = SimulationObjectFactory.create_simulation_object( 

490 objectType="recycle", 

491 flowsheet=self.flowsheet, 

492 coordinates={ 

493 'x': self.graphicObject.last().x + self.graphicObject.last().width / 2, 

494 'y': self.graphicObject.last().y + self.graphicObject.last().height / 2 + 100 

495 }, 

496 ) 

497 recycle.recycleData.update(self) 

498 recycle.save() 

499 

500 

501 def has_path_to(self, end_stream: "SimulationObject", check_recycles=True) -> bool: 

502 """ 

503 Checks if there is a path in the flowsheet from the start stream (self) to the end stream 

504 Can be used to check for loops in the flowsheet if these two streams are being merged 

505 

506 - param end_stream: The stream to check if there is a path to (from self) 

507 - param check_recycles: If True, will skip the path if a stream has a recycle connection 

508 """ 

509 remaining_streams = [self] 

510 while remaining_streams: 

511 current_stream = remaining_streams.pop() 

512 if current_stream == end_stream: 

513 # loop detected 

514 return True 

515 unit_op_port = current_stream.connectedPorts.filter(direction=ConType.Inlet).first() 

516 if not unit_op_port: 

517 continue 

518 unit_op = unit_op_port.unitOp 

519 # get the outlet ports of the unitop 

520 connected_port_keys = get_connected_port_keys(unit_op_port.key, unit_op.schema) 

521 outlet_ports = unit_op.ports.filter( 

522 direction=ConType.Outlet, 

523 key__in=connected_port_keys 

524 ) 

525 for outlet_port in outlet_ports: 

526 outlet_stream = outlet_port.stream 

527 if outlet_stream is not None: 527 ↛ 525line 527 didn't jump to line 525 because the condition on line 527 was always true

528 if check_recycles and outlet_stream.has_recycle_connection: 528 ↛ 529line 528 didn't jump to line 529 because the condition on line 528 was never true

529 continue 

530 remaining_streams.append(outlet_stream) 

531 return False 

532 

533 

534 def make_decision_node(self, num_inlets, num_outlets): 

535 """ 

536 Turns a stream into a decision node with n inlet and m outlet ports 

537 :param num_inlets: number of inlet ports to create 

538 :param num_outlets: number of outlet ports to create 

539 :return: Decision Node object 

540 """ 

541 from flowsheetInternals.unitops.models.simulation_object_factory import SimulationObjectFactory 

542 if self.objectType == SimulationObjectClass.Stream: 542 ↛ exitline 542 didn't return from function 'make_decision_node' because the condition on line 542 was always true

543 

544 # Create Decision Node 

545 modified_schema: ObjectType = configuration["decisionNode"].model_copy(deep=True) 

546 modified_schema.ports["inlet"].default = num_inlets 

547 modified_schema.ports["outlet"].default = num_outlets 

548 graphicObject = self.graphicObject.last() # For now, we are assuming this only has one graphicObject. 

549 decision_node = SimulationObjectFactory.create_simulation_object( 

550 coordinates={'x': graphicObject.x, 'y': graphicObject.y}, 

551 objectType="decisionNode", 

552 schema=modified_schema, 

553 flowsheet=self.flowsheet, 

554 create_attached_streams=False, 

555 ) 

556 

557 

558 if self.connectedPorts.count() > 1: 

559 # Connect both streams to Decision Node 

560 ms_outlet_port = self.connectedPorts.filter(direction=ConType.Inlet).first() 

561 dn_outlet_port = decision_node.ports.filter(direction=ConType.Outlet).first() 

562 dn_inlet_port = decision_node.ports.filter(direction=ConType.Inlet).first() 

563 new_stream = SimulationObjectFactory.create_stream_at_port(port=dn_outlet_port) 

564 ms_outlet_port.stream = new_stream 

565 dn_inlet_port.stream = self 

566 

567 # Save Objects 

568 ms_outlet_port.save() 

569 dn_outlet_port.save() 

570 dn_inlet_port.save() 

571 decision_node.save() 

572 

573 # Center GraphicObjects 

574 self.horizontally_center_graphic() 

575 new_stream.horizontally_center_graphic() 

576 self.save() 

577 new_stream.save() 

578 else: 

579 # For now, only dealing with the case that a regular stream is initialized with one port 

580 port = decision_node.ports.first() 

581 port.stream = self 

582 port.save() 

583 self.horizontally_center_graphic() 

584 self.save() 

585 self.save() 

586 decision_node.save() 

587 #handle compounds for decision node 

588 update_decision_node_and_propagate(decision_node, updated_via_right_click=True) 

589 return decision_node 

590 

591 

592 def update_compounds(self, compounds: list[str]) -> None: 

593 """ 

594 Updates the compounds for this stream 

595 """ 

596 update_compounds_on_set(self, compounds) 

597 

598 

599 def add_port(self, key: str, existing_stream: "SimulationObject | None" = None) -> Port: 

600 """ 

601 Adds a port to this object and adds a a new stream if none is provided 

602 """ 

603 from flowsheetInternals.unitops.models.simulation_object_factory import SimulationObjectFactory 

604 # Create ports 

605 ports_schema = self.schema.ports 

606 

607 # replace any many=True ports with multiple ports 

608 port_dict = ports_schema[key] 

609 # get the next index for the port (looks at all ports with the same key and gets the length of the l 

610 next_index = self.ports.filter(key=key).count() 

611 new_port = Port( 

612 key=key, 

613 index=next_index, 

614 direction=port_dict.type, 

615 displayName=port_dict.displayName + f" {next_index+1}", 

616 unitOp=self, 

617 flowsheet=self.flowsheet 

618 ) 

619 new_port.save() 

620 

621 self.update_height() 

622 

623 

624 if existing_stream: 

625 #connect existing stream to the new port 

626 new_port.stream = existing_stream 

627 new_port.save() #might have to remove this trying to figure out why other streams are being removed  

628 

629 if self.objectType == "decisionNode": 629 ↛ 696line 629 didn't jump to line 696 because the condition on line 629 was always true

630 # Use existing function to update decision node compounds 

631 update_decision_node_and_propagate(self) 

632 else: 

633 #create a new strea at the port using the factory  

634 new_stream = SimulationObjectFactory.create_stream_at_port(new_port) 

635 update_compounds_on_add_stream(new_port, new_stream) 

636 outlet_name = self.schema.splitter_fraction_name 

637 for property_key, property in self.schema.properties.items(): 

638 if property.indexSets is not None and ('splitter_fraction' in property.indexSets) and new_port.direction == ConType.Outlet: 638 ↛ 693line 638 didn't jump to line 693 because the condition on line 638 was always true

639 property_infos = self.properties.ContainedProperties.all() 

640 property_info = property_infos.first() 

641 new_indexed_item = IndexedItem.objects.create( 

642 owner=self, 

643 key="outlet_" + f"{next_index+1}", 

644 displayName= outlet_name + f" {next_index+1}", 

645 type="splitter_fraction", 

646 flowsheet=self.flowsheet, 

647 ) 

648 

649 # get indexed set for split_fraction or priorities: 

650 index_sets = property.indexSets 

651 

652 # figure out which other indexed items this should link to 

653 indexed_item_sets : List[List[IndexedItem]]= [] 

654 for index in index_sets: 

655 if index == IndexChoices.SplitterFraction: 

656 continue # We don't need this, it's the one we're editing 

657 indexed_items : List[IndexedItem] = self.get_indexed_items(index) # e.g get_indexed_items("phase"), "compound", etc 

658 indexed_item_sets.append(indexed_items) 

659 

660 # create a property value for each combination of indexed items, for the new outlet 

661 combinations = list(itertools.product(*indexed_item_sets)) 

662 

663 # Bulk update existing property values to be enabled 

664 property_status = False 

665 if self.objectType == "header" or self.objectType == "simple_header": 665 ↛ 666line 665 didn't jump to line 666 because the condition on line 665 was never true

666 property_status = True 

667 else: 

668 property_info.values.update(enabled=True) 

669 

670 

671 # bulk create indexed items (all disabled as it's the last outlet) 

672 index_links_to_create : List[PropertyValueIntermediate] = [] 

673 for combination in combinations: 

674 # Create a propertyValue for this combination 

675 combo_property_value = PropertyValue.objects.create(property=property_info, enabled=property_status, flowsheet=self.flowsheet) 

676 # link it up to the new outlet 

677 index_links_to_create.append( 

678 PropertyValueIntermediate(propertyvalue=combo_property_value, indexeditem=new_indexed_item) 

679 ) 

680 

681 #PropertyValueIntermediate.objects.create(propertyvalue=combo_property_value, indexeditem=new_indexed_item) 

682 # Also link it up to all the other sets 

683 for indexed_item in combination: 

684 #PropertyValueIntermediate.objects.create(propertyvalue=combo_property_value, indexeditem=indexed_item) 

685 

686 index_links_to_create.append( 

687 PropertyValueIntermediate(propertyvalue=combo_property_value, indexeditem=indexed_item) 

688 ) 

689 

690 # Bulk create the links 

691 PropertyValueIntermediate.objects.bulk_create(index_links_to_create) 

692 

693 new_stream.save() 

694 

695 

696 return new_port 

697 

698 def update_height(self): 

699 """ 

700 Updates the height of the graphic object based on the number of ports 

701 """ 

702 if self.schema.graphicObject.autoHeight: # e.g header has auto height calculation 702 ↛ 703line 702 didn't jump to line 703 because the condition on line 702 was never true

703 max_ports = max(self.ports.filter(direction=ConType.Inlet).count(), self.ports.filter(direction=ConType.Outlet).count()) 

704 self.graphicObject.update(height=max_ports * 200) 

705 

706 

707 def get_indexed_items(self,index_set_type : IndexChoices) -> List[IndexedItem]: 

708 match index_set_type: 

709 case IndexChoices.Phase: 

710 # get all the indexedItems that are type=phase 

711 items = IndexedItem.objects.filter(owner=self, type=IndexChoices.Phase).all() 

712 return items 

713 case IndexChoices.Compound: 713 ↛ 716line 713 didn't jump to line 716 because the pattern on line 713 always matched

714 items = IndexedItem.objects.filter(owner=self, type=IndexChoices.Compound).all() 

715 return items 

716 case _: 

717 raise ValueError("Get_indexed_items didn't expect this index set type") 

718 

719 

720 

721 def merge_decision_nodes(self, decision_node_active: "SimulationObject", decision_node_over: "SimulationObject") -> Optional["SimulationObject"]: 

722 """ 

723 Merges this decision node with the over decision node and handles graphics positioning 

724 """ 

725 import flowsheetInternals.unitops.models.delete_factory as DeleteFactory 

726 

727 # Get all streams from active node 

728 inlet_streams = decision_node_active.ports.filter(direction="inlet").all() 

729 outlet_streams = decision_node_active.ports.filter(direction="outlet").all() 

730 

731 # Transfer and reposition inlet streams 

732 for inlet_port in inlet_streams: 

733 stream = inlet_port.stream 

734 # Add stream to new decision node 

735 decision_node_over.add_port("inlet", stream) 

736 

737 # Update compounds and propagate 

738 update_decision_node_and_propagate(decision_node_over) 

739 

740 # Transfer and reposition outlet streams 

741 for outlet_port in outlet_streams: 

742 stream = outlet_port.stream 

743 # Add stream to new decision node 

744 decision_node_over.add_port("outlet", stream) 

745 

746 # Delete the active node and its graphic object 

747 DeleteFactory.delete_object(decision_node_active) 

748 decision_node_over.save() 

749 return decision_node_over 

750 

751 

752 def reevaluate_properties_enabled(self) -> None: 

753 """ 

754 Reevaluates property access for all properties in this object 

755 

756 This should only really be called when connections are changed, 

757 otherwise the property enabling should be handled by adding 

758 or removing control values. 

759 """ 

760 if(self.objectType == SimulationObjectClass.MachineLearningBlock): 

761 return # We don't want to change from the defaults 

762 properties: list[PropertyInfo] = self.properties.ContainedProperties.all() 

763 config = self.schema 

764 config_properties = config.properties 

765 config_groups = config.propertySetGroups 

766 def _eval_enabled(prop: PropertyInfo, config_group) -> bool: 

767 if self.is_stream(): 

768 # disable outlet/intermediate stream properties 

769 ports = self.connectedPorts.all() 

770 if len(ports) == 2 or (len(ports) == 1 and ports[0].direction == "outlet"): 

771 return False 

772 if config_group is None: 772 ↛ 773line 772 didn't jump to line 773 because the condition on line 772 was never true

773 return True # default to enabled, e.g for custom properties 

774 if config_group.type == "stateVars": 

775 state_vars = getattr(config_group, "stateVars") or () 

776 return prop.key in state_vars 

777 return True # eg. All, default to enabled 

778 

779 list_prop_val = [] 

780 

781 for prop in properties: 

782 config_prop = config_properties.get(prop.key) 

783 if config_prop: 783 ↛ 786line 783 didn't jump to line 786 because the condition on line 783 was always true

784 group = config_prop.propertySetGroup 

785 else: 

786 group = "default" 

787 config_group = config_groups.get(group, None) 

788 res = _eval_enabled(prop, config_group) 

789 list_prop_val.extend(prop.enable(res)) 

790 

791 PropertyValue.objects.bulk_update(list_prop_val, ["enabled"]) 

792 

793 

794 def get_unspecified_properties(self) -> list: 

795 if not hasattr(self, "properties") or not self.properties: 795 ↛ 796line 795 didn't jump to line 796 because the condition on line 795 was never true

796 return [] 

797 

798 contained_properties: models.QuerySet[PropertyInfo] = self.properties.ContainedProperties.all() 

799 is_splitter = getattr(self.schema, "displayType", "").lower() == "splitter" 

800 

801 # use schema stateVars to find required properties 

802 required_properties: Set[str] = set() 

803 for group in self.schema.propertySetGroups.values(): 

804 if group.toggle and not self.properties.get_property(group.toggle).get_value(): 804 ↛ 806line 804 didn't jump to line 806 because the condition on line 804 was never true

805 # The group is toggled off, so we don't care that the properties are not specified 

806 continue 

807 if group.type == "stateVars" or group.type == "composition" or group.type == "exceptLast": 807 ↛ 803line 807 didn't jump to line 803 because the condition on line 807 was always true

808 required_properties.update(group.stateVars) 

809 

810 # use pre fetched data to avoid additional queries 

811 unspecified_properties = [] 

812 property_info: PropertyInfo 

813 for property_info in contained_properties: 

814 # check if the property has no values or if all values are invalid 

815 has_valid_value = property_info.isSpecified() 

816 if not has_valid_value and property_info.key in required_properties: 

817 unspecified_properties.append(property_info.key) 

818 

819 # check if mole_frac_comp sums to 1 

820 mole_frac_props = [x for x in contained_properties if x.key=="mole_frac_comp"] 

821 

822 for prop in mole_frac_props: 

823 if prop.has_value_bulk(): 

824 total = 0.0 

825 for value in prop.values.all(): 

826 try: 

827 val = float(value.value) 

828 total += val 

829 except (ValueError, TypeError): 

830 continue 

831 if abs(total - 1.0) > 0.001 and "mole_frac_comp" not in unspecified_properties: 831 ↛ 832line 831 didn't jump to line 832 because the condition on line 831 was never true

832 unspecified_properties.append("mole_frac_comp") 

833 else: 

834 if "mole_frac_comp" not in unspecified_properties: 

835 unspecified_properties.append("mole_frac_comp") 

836 

837 return unspecified_properties 

838 

839 def delete(self, *args, **kwargs): 

840 raise NotImplementedError("Use delete_object method from DeleteFactory") 

841 

842 def permanently_delete(self, *args, **kwargs): 

843 """ 

844 Permanently deletes the object from the database. 

845 This should only be used in tests or when you are sure you want to delete the object. 

846 """ 

847 super().delete(*args, **kwargs) 

848 

849