Coverage for backend/flowsheetInternals/unitops/models/compound_propogation.py: 97%
128 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
1import itertools
2import copy
3from typing import TYPE_CHECKING, Literal, TypedDict
5from core.auxiliary.models.IndexedItem import IndexedItem, IndexChoices
6from core.auxiliary.models.PropertyValue import PropertyValue, PropertyValueIntermediate
7from core.auxiliary.models.PropertyInfo import PropertyInfo
8from core.auxiliary.enums.unitOpGraphics import ConType
9from core.auxiliary.enums.unitOpData import SimulationObjectClass
10from flowsheetInternals.unitops.models.flow_tracking import track_stream_flow
11from flowsheetInternals.unitops.config.config_methods import get_connected_port_keys
13if TYPE_CHECKING:
14 from flowsheetInternals.unitops.models.SimulationObject import SimulationObject
15 from flowsheetInternals.unitops.models.Port import Port
19def update_compounds_on_set(stream: "SimulationObject", expected_keys: list[str]) -> None:
20 """
21 Directly sets the compounds in a stream to match the expected compounds.
22 """
23 propagator = CompoundPropogation(stream, expected_keys)
24 propagator.run()
27def update_compounds_on_add_stream(port: "Port", stream: "SimulationObject") -> None:
28 """
29 Handles compound propogation when a stream is added to a port.
30 """
31 # no mole_frac_comp -> probably a bus
32 if not stream.properties.ContainedProperties.filter(key="mole_frac_comp").exists():
33 return
34 unitop = port.unitOp
37 other_port_keys = get_connected_port_keys(port.key, unitop.schema)
39 other_ports = unitop.ports.filter(
40 key__in=other_port_keys,
41 )
42 streams = [port.stream for port in other_ports if port.stream]
43 unique_compounds = set()
44 for neighbour_stream in streams:
45 unique_compounds.update(_get_compound_keys(neighbour_stream))
48 propagator = CompoundPropogation(stream, unique_compounds)
49 propagator.run_for_stream(stream)
53def update_compounds_on_merge(inlet_stream: "SimulationObject", outlet_stream: "SimulationObject") -> None:
54 """
55 Handles compound propogation when two streams are merged.
56 """
57 expected_keys = _get_compound_keys(outlet_stream)
58 other_expected_keys = _get_compound_keys(inlet_stream)
59 # combine the lists, removing duplicates
60 expected_keys.extend(other_expected_keys)
61 expected_keys = list(set(expected_keys))
62 propogator = CompoundPropogation(inlet_stream, expected_keys)
63 propogator.run(include_source=False)
67def _get_compound_keys(stream: "SimulationObject") -> list[str]:
68 indexed_items = IndexedItem.objects.filter(owner=stream, type=IndexChoices.Compound)
69 return [item.key for item in indexed_items]
71# "and" should never be in a function name,
72# because one function should do one thing.
73# TODO: Split this into two seperate functions.
74def update_decision_node_and_propagate(decisionNode: "SimulationObject", updated_via_right_click: bool = False):
75 """
76 Look through all the decision node's inlet streams to find a sum of the compounds
77 Then call propagate for every outlet stream of the decision node
78 """
79 compound_key_set = set()
81 for port in decisionNode.ports.all():
82 if port.stream:
83 compound_key_set.update(_get_compound_keys(port.stream))
85 # Create propagator with all compounds
86 compound_keys = list(compound_key_set)
88 # Update the decision node itself
89 propagator = CompoundPropogation(decisionNode, compound_keys)
90 propagator.run()
93def run_for_stream(stream: "SimulationObject", expected_compounds: list[str]) -> None:
94 """
95 Runs the compound propogation for a specific stream (doesn't propogate to other streams).
96 """
97 propagator = CompoundPropogation(stream, expected_compounds)
98 propagator.run_for_stream(stream)
101class StreamsToUpdateItem(TypedDict):
102 add: list[str]
103 remove: list[str]
106class CompoundPropogation:
107 """
108 This class handles the addition and removal of chemical compounds in process streams.
109 It manages how compounds propagate through connected equipment (like pumps, mixers, etc.)
110 """
111 def __init__(self, stream: "SimulationObject", expected_compounds: list[str]) -> None:
112 self.source_stream: "SimulationObject" = stream
113 self.expected_compounds: list[str] = expected_compounds
114 self.create_property_values = []
115 self.create_indexed_items = []
116 self.create_property_value_indexed_items = []
117 self.delete_property_values = []
118 self.delete_indexed_items = []
120 def _update_compounds(self, unitop: "SimulationObject") -> None:
121 """
122 Updates the compounds in an object, deleting any that are no longer present
123 and adding any that are now present.
124 """
125 new_indexed_items = []
126 if "compound" in unitop.schema.indexSets:
127 # Remove any compounds that are not in the list of expected compounds
128 compound_indexed_items = IndexedItem.objects.filter(owner=unitop, type=IndexChoices.Compound)
129 indexed_items_to_remove = compound_indexed_items.exclude(key__in=self.expected_compounds)
130 indexed_items_to_keep = compound_indexed_items.filter(key__in=self.expected_compounds)
132 property_values_to_remove = PropertyValue.objects.filter(indexedItems__in=indexed_items_to_remove)
133 property_values_to_remove.delete()
134 indexed_items_to_remove.delete()
136 present_compounds: list[str] = indexed_items_to_keep.values_list('key', flat=True)
137 for compound in self.expected_compounds:
138 if compound not in present_compounds:
139 # If the compound is not already present, create a new indexed item for it
141 new_index = IndexedItem(owner=unitop, key=compound, displayName=compound, type=IndexChoices.Compound, flowsheet=unitop.flowsheet)
142 new_indexed_items.append(new_index) # List of new indexed items for this unit op
143 self.create_indexed_items.append(new_index) # List of all new indexed items for all unit ops
145 # Create property info
146 propertySet = unitop.properties
148 # Update compound-dependent properties ( e.g compound separator split fraction)
149 for property in propertySet.ContainedProperties.all ():
150 index_sets = unitop.schema.properties[property.key].indexSets
152 if index_sets is not None and "compound" in unitop.schema.properties[property.key].indexSets:
153 self._property_add_remove(unitop,property,new_indexed_items)
155 # Note that this does not set the enabled status correctly, so we need to reevaluate_properties_enabled later..
158 def _property_add_remove(self, unitop:"SimulationObject", propertyInfo: "PropertyInfo",new_indexed_items: list[IndexedItem]) -> None:
159 """
160 Adds or removes the indexes and property values from the propertyInfo
161 """
162 # get the other indexes for this property
163 schema = unitop.schema.properties[propertyInfo.key]
164 group = schema.propertySetGroup
165 enabled = propertyInfo.key in unitop.schema.propertySetGroups[group].stateVars
166 other_indexes = IndexedItem.objects.filter(
167 owner=unitop, type__in=schema.indexSets
168 ).exclude(type=IndexChoices.Compound)
170 # The propertyValues to delete should already have been deleted by now.
172 indexes = [list(other_indexes), new_indexed_items]
173 combinations = list(itertools.product(*indexes))
174 property_values = []
176 if schema.sumToOne:
177 # Leave the last combination as disabled, as they can be calculated from the other compounds.
178 cutoff = len(combinations) - 1 - len(self.create_indexed_items)
179 else:
180 # all combinations should be enabled
181 cutoff = len(combinations)
182 for index, idxes in enumerate(combinations):
183 if index <= cutoff:
184 property_value = PropertyValue(value=schema.value, property=propertyInfo, enabled=enabled, flowsheet=unitop.flowsheet)
185 else:
186 property_value = PropertyValue(value=schema.value, property=propertyInfo, enabled=False, flowsheet=unitop.flowsheet)
187 for indexed_item in idxes:
188 self.create_property_value_indexed_items.append(
189 PropertyValueIntermediate(propertyvalue=property_value, indexeditem=indexed_item)
190 )
191 property_values.append(property_value)
192 if len(combinations) == 0:
193 # There are no other indexes other than for this compound. (e.g mole_frac_comp)
194 # just create a property value for each indexed item
195 for indexed_item in new_indexed_items:
196 property_value = PropertyValue(value=schema.value, property=propertyInfo, enabled=enabled, flowsheet=unitop.flowsheet)
197 property_values.append(property_value)
198 self.create_property_value_indexed_items.append(
199 PropertyValueIntermediate(propertyvalue=property_value, indexeditem=indexed_item)
200 )
202 if propertyInfo.key == "mole_frac_comp" and len(self.expected_compounds) == 1:
203 # If this is a mole_frac_comp property and there is only one compound, set the value to 1.0
204 for property_value in property_values:
205 property_value.value = 1.0
206 property_value.displayValue = 1.0
208 self.create_property_values.extend(property_values)
211 def run(self, include_source: bool = True) -> None:
212 """
213 Main method that orchestrates the entire compound update process:
214 1. Identifies all affected streams
215 2. Prepares database changes (additions and removals)
216 3. Updates the database in bulk for better performance
217 4. Handles read/write permissions for properties
219 Args:
220 - include_source: Whether to include the source stream in the update
221 """
223 unit_ops, streams = track_stream_flow(self.source_stream)
225 for unit_op in unit_ops:
226 self._update_compounds(unit_op)
228 for current_stream in streams:
229 self._update_compounds(current_stream)
231 # Perform database changes
232 self._handle_database_changes()
234 objects = streams | unit_ops
235 for simulation_object in objects:
236 simulation_object.reevaluate_properties_enabled()
238 def run_for_stream(self, stream: "SimulationObject") -> None:
239 """
240 Runs the compound propogation only for a specific stream (doesn't propogate to other streams).
241 """
242 self._update_compounds(stream)
243 self._handle_database_changes()
244 stream.reevaluate_properties_enabled()
246 def _handle_database_changes(self) -> None: # First
247 """
248 Handles the database changes for the compound propogation based on the streams_to_update dictionary.
249 """
251 # Perform bulk operations
252 if self.delete_property_values: 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true
253 PropertyValue.objects.filter(id__in=self.delete_property_values).delete()
254 IndexedItem.objects.filter(id__in=self.delete_indexed_items).delete()
255 if self.create_property_values:
256 PropertyValue.objects.bulk_create(self.create_property_values)
257 IndexedItem.objects.bulk_create(self.create_indexed_items)
258 PropertyValueIntermediate.objects.bulk_create(self.create_property_value_indexed_items)