Coverage for backend/django/flowsheetInternals/unitops/models/compound_propogation.py: 96%

135 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-06-23 21:51 +0000

1import itertools 

2from typing import TYPE_CHECKING, Literal, TypedDict 

3 

4from core.auxiliary.models.IndexedItem import IndexedItem, IndexChoices 

5from core.auxiliary.models.PropertyValue import PropertyValue, PropertyValueIntermediate 

6from core.auxiliary.models.PropertyInfo import PropertyInfo 

7from flowsheetInternals.unitops.models.flow_tracking import track_stream_flow 

8from flowsheetInternals.unitops.config.config_methods import get_connected_port_keys 

9if TYPE_CHECKING: 

10 from flowsheetInternals.unitops.models.SimulationObject import SimulationObject 

11 from flowsheetInternals.unitops.models.Port import Port 

12 

13 

14 

15def update_compounds_on_set(stream: "SimulationObject", expected_keys: list[str]) -> None: 

16 """ 

17 Directly sets the compounds in a stream to match the expected compounds. 

18 """ 

19 # Preserve the first-seen order while ignoring duplicate compound keys. 

20 unique_expected_keys = list(dict.fromkeys(expected_keys)) 

21 propagator = CompoundPropogation(stream, unique_expected_keys) 

22 propagator.run() 

23 

24 

25def update_compounds_on_add_stream(port: "Port", stream: "SimulationObject") -> None: 

26 """ 

27 Handles compound propogation when a stream is added to a port. 

28 """ 

29 # no mole_frac_comp -> probably a bus 

30 if not stream.properties.ContainedProperties.filter(key="mole_frac_comp").exists(): 

31 return 

32 unitop = port.unitOp 

33 

34 

35 other_port_keys = get_connected_port_keys(port.key, unitop.schema) 

36 

37 other_ports = unitop.ports.filter( 

38 key__in=other_port_keys, 

39 ) 

40 streams = [port.stream for port in other_ports if port.stream] 

41 unique_compounds = set() 

42 for neighbour_stream in streams: 

43 unique_compounds.update(_get_compound_keys(neighbour_stream)) 

44 

45 

46 propagator = CompoundPropogation(stream, unique_compounds) 

47 propagator.run_for_stream(stream) 

48 

49 if neighbour_stream is not None: 49 ↛ exitline 49 didn't return from function 'update_compounds_on_add_stream' because the condition on line 49 was always true

50 stream.propertyPackageType = neighbour_stream.propertyPackageType 

51 stream.save() 

52 

53 

54def update_compounds_on_merge(inlet_stream: "SimulationObject", outlet_stream: "SimulationObject") -> None: 

55 """ 

56 Handles compound propogation when two streams are merged. 

57 """ 

58 expected_keys = _get_compound_keys(outlet_stream) 

59 other_expected_keys = _get_compound_keys(inlet_stream) 

60 # combine the lists, removing duplicates 

61 expected_keys.extend(other_expected_keys) 

62 expected_keys = list(set(expected_keys)) 

63 propogator = CompoundPropogation(inlet_stream, expected_keys) 

64 propogator.run(include_source=False) 

65 

66 from flowsheetInternals.unitops.models.property_package_propogation import propogate_property_package 

67 inlet_stream.propertyPackageType = outlet_stream.propertyPackageType 

68 propogate_property_package(inlet_stream) 

69 

70 

71def _get_compound_keys(stream: "SimulationObject") -> list[str]: 

72 indexed_items = IndexedItem.objects.filter(owner=stream, type=IndexChoices.Compound) 

73 return [item.key for item in indexed_items] 

74 

75# "and" should never be in a function name, 

76# because one function should do one thing. 

77# TODO: Split this into two seperate functions. 

78def update_decision_node_and_propagate(decisionNode: "SimulationObject", updated_via_right_click: bool = False): 

79 """ 

80 Look through all the decision node's inlet streams to find a sum of the compounds 

81 Then call propagate for every outlet stream of the decision node 

82 """ 

83 compound_key_set = set() 

84 

85 for port in decisionNode.ports.all(): 

86 if port.stream: 

87 compound_key_set.update(_get_compound_keys(port.stream)) 

88 

89 # Create propagator with all compounds 

90 compound_keys = list(compound_key_set) 

91 

92 # Update the decision node itself 

93 propagator = CompoundPropogation(decisionNode, compound_keys) 

94 propagator.run() 

95 

96 

97def run_for_stream(stream: "SimulationObject", expected_compounds: list[str]) -> None: 

98 """ 

99 Runs the compound propogation for a specific stream (doesn't propogate to other streams). 

100 """ 

101 propagator = CompoundPropogation(stream, expected_compounds) 

102 propagator.run_for_stream(stream) 

103 

104 

105class StreamsToUpdateItem(TypedDict): 

106 add: list[str] 

107 remove: list[str] 

108 

109 

110class CompoundPropogation: 

111 """ 

112 This class handles the addition and removal of chemical compounds in process streams. 

113 It manages how compounds propagate through connected equipment (like pumps, mixers, etc.) 

114 """ 

115 def __init__(self, stream: "SimulationObject", expected_compounds: list[str]) -> None: 

116 self.source_stream: "SimulationObject" = stream 

117 self.expected_compounds: list[str] = expected_compounds 

118 self.create_property_values = [] 

119 self.create_indexed_items = [] 

120 self.create_property_value_indexed_items = [] 

121 self.delete_property_values = [] 

122 self.delete_indexed_items = [] 

123 

124 def _update_compounds(self, unitop: "SimulationObject") -> None: 

125 """ 

126 Updates the compounds in an object, deleting any that are no longer present 

127 and adding any that are now present. 

128 """ 

129 new_indexed_items = [] 

130 if "compound" in unitop.schema.indexSets: 

131 # Remove any compounds that are not in the list of expected compounds 

132 compound_indexed_items = IndexedItem.objects.filter(owner=unitop, type=IndexChoices.Compound) 

133 indexed_items_to_remove = compound_indexed_items.exclude(key__in=self.expected_compounds) 

134 indexed_items_to_keep = compound_indexed_items.filter(key__in=self.expected_compounds) 

135 

136 property_values_to_remove = PropertyValue.objects.filter(indexedItems__in=indexed_items_to_remove) 

137 property_values_to_remove.delete() 

138 indexed_items_to_remove.delete() 

139 

140 present_compounds: list[str] = indexed_items_to_keep.values_list('key', flat=True) 

141 for compound in self.expected_compounds: 

142 if compound not in present_compounds: 

143 # If the compound is not already present, create a new indexed item for it 

144 

145 new_index = IndexedItem(owner=unitop, key=compound, displayName=compound, type=IndexChoices.Compound, flowsheet=unitop.flowsheet) 

146 new_indexed_items.append(new_index) # List of new indexed items for this unit op 

147 self.create_indexed_items.append(new_index) # List of all new indexed items for all unit ops 

148 

149 # Create property info 

150 propertySet = unitop.properties 

151 

152 # Update compound-dependent properties ( e.g compound separator split fraction) 

153 for property in propertySet.ContainedProperties.all (): 

154 if property.key not in unitop.schema.properties: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true

155 continue # This is a custom property so we don't have to worry about adding compounds to it. 

156 property_schema = unitop.schema.properties[property.key] 

157 index_sets = property_schema.indexSets 

158 if index_sets is not None and "compound" in index_sets: 

159 self._property_add_remove(unitop,property,new_indexed_items) 

160 

161 # Note that this does not set the enabled status correctly, so we need to reevaluate_properties_enabled later.. 

162 

163 

164 def _property_add_remove(self, unitop:"SimulationObject", propertyInfo: "PropertyInfo",new_indexed_items: list[IndexedItem]) -> None: 

165 """ 

166 Adds or removes the indexes and property values from the propertyInfo 

167 """ 

168 # get the other indexes for this property 

169 schema = unitop.schema.properties[propertyInfo.key] 

170 group = schema.propertySetGroup 

171 enabled = propertyInfo.key in unitop.schema.propertySetGroups[group].stateVars 

172 other_indexes = IndexedItem.objects.filter( 

173 owner=unitop, type__in=schema.indexSets 

174 ).exclude(type=IndexChoices.Compound) 

175 

176 # The propertyValues to delete should already have been deleted by now. 

177 

178 indexes = [list(other_indexes), new_indexed_items] 

179 combinations = list(itertools.product(*indexes)) 

180 property_values = [] 

181 

182 if schema.sumToOne: 

183 # Leave the last combination as disabled, as they can be calculated from the other compounds. 

184 cutoff = len(combinations) - 1 - len(self.create_indexed_items) 

185 else: 

186 # all combinations should be enabled 

187 cutoff = len(combinations) 

188 for index, idxes in enumerate(combinations): 

189 if index <= cutoff: 

190 property_value = PropertyValue(value=schema.value, property=propertyInfo, enabled=enabled, flowsheet=unitop.flowsheet) 

191 else: 

192 property_value = PropertyValue(value=schema.value, property=propertyInfo, enabled=False, flowsheet=unitop.flowsheet) 

193 for indexed_item in idxes: 

194 self.create_property_value_indexed_items.append( 

195 PropertyValueIntermediate(propertyvalue=property_value, indexeditem=indexed_item) 

196 ) 

197 property_values.append(property_value) 

198 if len(combinations) == 0: 

199 # There are no other indexes other than for this compound. (e.g mole_frac_comp) 

200 # just create a property value for each indexed item 

201 for indexed_item in new_indexed_items: 

202 property_value = PropertyValue(value=schema.value, property=propertyInfo, enabled=enabled, flowsheet=unitop.flowsheet) 

203 property_values.append(property_value) 

204 self.create_property_value_indexed_items.append( 

205 PropertyValueIntermediate(propertyvalue=property_value, indexeditem=indexed_item) 

206 ) 

207 

208 if propertyInfo.key == "mole_frac_comp" and len(self.expected_compounds) == 1: 

209 # If this is a mole_frac_comp property and there is only one compound, set the value to 1.0 

210 for property_value in property_values: 

211 property_value.value = 1.0 

212 property_value.displayValue = 1.0 

213 

214 self.create_property_values.extend(property_values) 

215 

216 

217 def run(self, include_source: bool = True) -> None: 

218 """ 

219 Main method that orchestrates the entire compound update process: 

220 1. Identifies all affected streams 

221 2. Prepares database changes (additions and removals) 

222 3. Updates the database in bulk for better performance 

223 4. Handles read/write permissions for properties 

224  

225 Args: 

226 - include_source: Whether to include the source stream in the update 

227 """ 

228 

229 unit_ops, streams = track_stream_flow(self.source_stream) 

230 

231 for unit_op in unit_ops: 

232 self._update_compounds(unit_op) 

233 

234 for current_stream in streams: 

235 self._update_compounds(current_stream) 

236 

237 # Perform database changes 

238 self._handle_database_changes() 

239 

240 objects = streams | unit_ops 

241 for simulation_object in objects: 

242 simulation_object.reevaluate_properties_enabled() 

243 

244 def run_for_stream(self, stream: "SimulationObject") -> None: 

245 """ 

246 Runs the compound propogation only for a specific stream (doesn't propogate to other streams). 

247 """ 

248 self._update_compounds(stream) 

249 self._handle_database_changes() 

250 stream.reevaluate_properties_enabled() 

251 

252 def _handle_database_changes(self) -> None: # First  

253 """ 

254 Handles the database changes for the compound propogation based on the streams_to_update dictionary. 

255 """ 

256 

257 # Perform bulk operations 

258 if self.delete_property_values: 258 ↛ 259line 258 didn't jump to line 259 because the condition on line 258 was never true

259 PropertyValue.objects.filter(id__in=self.delete_property_values).delete() 

260 IndexedItem.objects.filter(id__in=self.delete_indexed_items).delete() 

261 if self.create_property_values: 

262 PropertyValue.objects.bulk_create(self.create_property_values) 

263 IndexedItem.objects.bulk_create(self.create_indexed_items) 

264 PropertyValueIntermediate.objects.bulk_create(self.create_property_value_indexed_items) 

265