Coverage for backend/flowsheetInternals/unitops/models/compound_propogation.py: 97%

128 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-11-06 23:27 +0000

1import itertools 

2import copy 

3from typing import TYPE_CHECKING, Literal, TypedDict 

4 

5from core.auxiliary.models.IndexedItem import IndexedItem, IndexChoices 

6from core.auxiliary.models.PropertyValue import PropertyValue, PropertyValueIntermediate 

7from core.auxiliary.models.PropertyInfo import PropertyInfo 

8from core.auxiliary.enums.unitOpGraphics import ConType 

9from core.auxiliary.enums.unitOpData import SimulationObjectClass 

10from flowsheetInternals.unitops.models.flow_tracking import track_stream_flow 

11from flowsheetInternals.unitops.config.config_methods import get_connected_port_keys 

12 

13if TYPE_CHECKING: 

14 from flowsheetInternals.unitops.models.SimulationObject import SimulationObject 

15 from flowsheetInternals.unitops.models.Port import Port 

16 

17 

18 

19def update_compounds_on_set(stream: "SimulationObject", expected_keys: list[str]) -> None: 

20 """ 

21 Directly sets the compounds in a stream to match the expected compounds. 

22 """ 

23 propagator = CompoundPropogation(stream, expected_keys) 

24 propagator.run() 

25 

26 

27def update_compounds_on_add_stream(port: "Port", stream: "SimulationObject") -> None: 

28 """ 

29 Handles compound propogation when a stream is added to a port. 

30 """ 

31 # no mole_frac_comp -> probably a bus 

32 if not stream.properties.ContainedProperties.filter(key="mole_frac_comp").exists(): 

33 return 

34 unitop = port.unitOp 

35 

36 

37 other_port_keys = get_connected_port_keys(port.key, unitop.schema) 

38 

39 other_ports = unitop.ports.filter( 

40 key__in=other_port_keys, 

41 ) 

42 streams = [port.stream for port in other_ports if port.stream] 

43 unique_compounds = set() 

44 for neighbour_stream in streams: 

45 unique_compounds.update(_get_compound_keys(neighbour_stream)) 

46 

47 

48 propagator = CompoundPropogation(stream, unique_compounds) 

49 propagator.run_for_stream(stream) 

50 

51 

52 

53def update_compounds_on_merge(inlet_stream: "SimulationObject", outlet_stream: "SimulationObject") -> None: 

54 """ 

55 Handles compound propogation when two streams are merged. 

56 """ 

57 expected_keys = _get_compound_keys(outlet_stream) 

58 other_expected_keys = _get_compound_keys(inlet_stream) 

59 # combine the lists, removing duplicates 

60 expected_keys.extend(other_expected_keys) 

61 expected_keys = list(set(expected_keys)) 

62 propogator = CompoundPropogation(inlet_stream, expected_keys) 

63 propogator.run(include_source=False) 

64 

65 

66 

67def _get_compound_keys(stream: "SimulationObject") -> list[str]: 

68 indexed_items = IndexedItem.objects.filter(owner=stream, type=IndexChoices.Compound) 

69 return [item.key for item in indexed_items] 

70 

71# "and" should never be in a function name, 

72# because one function should do one thing. 

73# TODO: Split this into two seperate functions. 

74def update_decision_node_and_propagate(decisionNode: "SimulationObject", updated_via_right_click: bool = False): 

75 """ 

76 Look through all the decision node's inlet streams to find a sum of the compounds 

77 Then call propagate for every outlet stream of the decision node 

78 """ 

79 compound_key_set = set() 

80 

81 for port in decisionNode.ports.all(): 

82 if port.stream: 

83 compound_key_set.update(_get_compound_keys(port.stream)) 

84 

85 # Create propagator with all compounds 

86 compound_keys = list(compound_key_set) 

87 

88 # Update the decision node itself 

89 propagator = CompoundPropogation(decisionNode, compound_keys) 

90 propagator.run() 

91 

92 

93def run_for_stream(stream: "SimulationObject", expected_compounds: list[str]) -> None: 

94 """ 

95 Runs the compound propogation for a specific stream (doesn't propogate to other streams). 

96 """ 

97 propagator = CompoundPropogation(stream, expected_compounds) 

98 propagator.run_for_stream(stream) 

99 

100 

101class StreamsToUpdateItem(TypedDict): 

102 add: list[str] 

103 remove: list[str] 

104 

105 

106class CompoundPropogation: 

107 """ 

108 This class handles the addition and removal of chemical compounds in process streams. 

109 It manages how compounds propagate through connected equipment (like pumps, mixers, etc.) 

110 """ 

111 def __init__(self, stream: "SimulationObject", expected_compounds: list[str]) -> None: 

112 self.source_stream: "SimulationObject" = stream 

113 self.expected_compounds: list[str] = expected_compounds 

114 self.create_property_values = [] 

115 self.create_indexed_items = [] 

116 self.create_property_value_indexed_items = [] 

117 self.delete_property_values = [] 

118 self.delete_indexed_items = [] 

119 

120 def _update_compounds(self, unitop: "SimulationObject") -> None: 

121 """ 

122 Updates the compounds in an object, deleting any that are no longer present 

123 and adding any that are now present. 

124 """ 

125 new_indexed_items = [] 

126 if "compound" in unitop.schema.indexSets: 

127 # Remove any compounds that are not in the list of expected compounds 

128 compound_indexed_items = IndexedItem.objects.filter(owner=unitop, type=IndexChoices.Compound) 

129 indexed_items_to_remove = compound_indexed_items.exclude(key__in=self.expected_compounds) 

130 indexed_items_to_keep = compound_indexed_items.filter(key__in=self.expected_compounds) 

131 

132 property_values_to_remove = PropertyValue.objects.filter(indexedItems__in=indexed_items_to_remove) 

133 property_values_to_remove.delete() 

134 indexed_items_to_remove.delete() 

135 

136 present_compounds: list[str] = indexed_items_to_keep.values_list('key', flat=True) 

137 for compound in self.expected_compounds: 

138 if compound not in present_compounds: 

139 # If the compound is not already present, create a new indexed item for it 

140 

141 new_index = IndexedItem(owner=unitop, key=compound, displayName=compound, type=IndexChoices.Compound, flowsheet=unitop.flowsheet) 

142 new_indexed_items.append(new_index) # List of new indexed items for this unit op 

143 self.create_indexed_items.append(new_index) # List of all new indexed items for all unit ops 

144 

145 # Create property info 

146 propertySet = unitop.properties 

147 

148 # Update compound-dependent properties ( e.g compound separator split fraction) 

149 for property in propertySet.ContainedProperties.all (): 

150 index_sets = unitop.schema.properties[property.key].indexSets 

151 

152 if index_sets is not None and "compound" in unitop.schema.properties[property.key].indexSets: 

153 self._property_add_remove(unitop,property,new_indexed_items) 

154 

155 # Note that this does not set the enabled status correctly, so we need to reevaluate_properties_enabled later.. 

156 

157 

158 def _property_add_remove(self, unitop:"SimulationObject", propertyInfo: "PropertyInfo",new_indexed_items: list[IndexedItem]) -> None: 

159 """ 

160 Adds or removes the indexes and property values from the propertyInfo 

161 """ 

162 # get the other indexes for this property 

163 schema = unitop.schema.properties[propertyInfo.key] 

164 group = schema.propertySetGroup 

165 enabled = propertyInfo.key in unitop.schema.propertySetGroups[group].stateVars 

166 other_indexes = IndexedItem.objects.filter( 

167 owner=unitop, type__in=schema.indexSets 

168 ).exclude(type=IndexChoices.Compound) 

169 

170 # The propertyValues to delete should already have been deleted by now. 

171 

172 indexes = [list(other_indexes), new_indexed_items] 

173 combinations = list(itertools.product(*indexes)) 

174 property_values = [] 

175 

176 if schema.sumToOne: 

177 # Leave the last combination as disabled, as they can be calculated from the other compounds. 

178 cutoff = len(combinations) - 1 - len(self.create_indexed_items) 

179 else: 

180 # all combinations should be enabled 

181 cutoff = len(combinations) 

182 for index, idxes in enumerate(combinations): 

183 if index <= cutoff: 

184 property_value = PropertyValue(value=schema.value, property=propertyInfo, enabled=enabled, flowsheet=unitop.flowsheet) 

185 else: 

186 property_value = PropertyValue(value=schema.value, property=propertyInfo, enabled=False, flowsheet=unitop.flowsheet) 

187 for indexed_item in idxes: 

188 self.create_property_value_indexed_items.append( 

189 PropertyValueIntermediate(propertyvalue=property_value, indexeditem=indexed_item) 

190 ) 

191 property_values.append(property_value) 

192 if len(combinations) == 0: 

193 # There are no other indexes other than for this compound. (e.g mole_frac_comp) 

194 # just create a property value for each indexed item 

195 for indexed_item in new_indexed_items: 

196 property_value = PropertyValue(value=schema.value, property=propertyInfo, enabled=enabled, flowsheet=unitop.flowsheet) 

197 property_values.append(property_value) 

198 self.create_property_value_indexed_items.append( 

199 PropertyValueIntermediate(propertyvalue=property_value, indexeditem=indexed_item) 

200 ) 

201 

202 if propertyInfo.key == "mole_frac_comp" and len(self.expected_compounds) == 1: 

203 # If this is a mole_frac_comp property and there is only one compound, set the value to 1.0 

204 for property_value in property_values: 

205 property_value.value = 1.0 

206 property_value.displayValue = 1.0 

207 

208 self.create_property_values.extend(property_values) 

209 

210 

211 def run(self, include_source: bool = True) -> None: 

212 """ 

213 Main method that orchestrates the entire compound update process: 

214 1. Identifies all affected streams 

215 2. Prepares database changes (additions and removals) 

216 3. Updates the database in bulk for better performance 

217 4. Handles read/write permissions for properties 

218  

219 Args: 

220 - include_source: Whether to include the source stream in the update 

221 """ 

222 

223 unit_ops, streams = track_stream_flow(self.source_stream) 

224 

225 for unit_op in unit_ops: 

226 self._update_compounds(unit_op) 

227 

228 for current_stream in streams: 

229 self._update_compounds(current_stream) 

230 

231 # Perform database changes 

232 self._handle_database_changes() 

233 

234 objects = streams | unit_ops 

235 for simulation_object in objects: 

236 simulation_object.reevaluate_properties_enabled() 

237 

238 def run_for_stream(self, stream: "SimulationObject") -> None: 

239 """ 

240 Runs the compound propogation only for a specific stream (doesn't propogate to other streams). 

241 """ 

242 self._update_compounds(stream) 

243 self._handle_database_changes() 

244 stream.reevaluate_properties_enabled() 

245 

246 def _handle_database_changes(self) -> None: # First  

247 """ 

248 Handles the database changes for the compound propogation based on the streams_to_update dictionary. 

249 """ 

250 

251 # Perform bulk operations 

252 if self.delete_property_values: 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true

253 PropertyValue.objects.filter(id__in=self.delete_property_values).delete() 

254 IndexedItem.objects.filter(id__in=self.delete_indexed_items).delete() 

255 if self.create_property_values: 

256 PropertyValue.objects.bulk_create(self.create_property_values) 

257 IndexedItem.objects.bulk_create(self.create_indexed_items) 

258 PropertyValueIntermediate.objects.bulk_create(self.create_property_value_indexed_items) 

259