Coverage for backend/django/flowsheetInternals/unitops/models/compound_propogation.py: 96%
135 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
1import itertools
2from typing import TYPE_CHECKING, Literal, TypedDict
4from core.auxiliary.models.IndexedItem import IndexedItem, IndexChoices
5from core.auxiliary.models.PropertyValue import PropertyValue, PropertyValueIntermediate
6from core.auxiliary.models.PropertyInfo import PropertyInfo
7from flowsheetInternals.unitops.models.flow_tracking import track_stream_flow
8from flowsheetInternals.unitops.config.config_methods import get_connected_port_keys
9if TYPE_CHECKING:
10 from flowsheetInternals.unitops.models.SimulationObject import SimulationObject
11 from flowsheetInternals.unitops.models.Port import Port
15def update_compounds_on_set(stream: "SimulationObject", expected_keys: list[str]) -> None:
16 """
17 Directly sets the compounds in a stream to match the expected compounds.
18 """
19 # Preserve the first-seen order while ignoring duplicate compound keys.
20 unique_expected_keys = list(dict.fromkeys(expected_keys))
21 propagator = CompoundPropogation(stream, unique_expected_keys)
22 propagator.run()
25def update_compounds_on_add_stream(port: "Port", stream: "SimulationObject") -> None:
26 """
27 Handles compound propogation when a stream is added to a port.
28 """
29 # no mole_frac_comp -> probably a bus
30 if not stream.properties.ContainedProperties.filter(key="mole_frac_comp").exists():
31 return
32 unitop = port.unitOp
35 other_port_keys = get_connected_port_keys(port.key, unitop.schema)
37 other_ports = unitop.ports.filter(
38 key__in=other_port_keys,
39 )
40 streams = [port.stream for port in other_ports if port.stream]
41 unique_compounds = set()
42 for neighbour_stream in streams:
43 unique_compounds.update(_get_compound_keys(neighbour_stream))
46 propagator = CompoundPropogation(stream, unique_compounds)
47 propagator.run_for_stream(stream)
49 if neighbour_stream is not None: 49 ↛ exitline 49 didn't return from function 'update_compounds_on_add_stream' because the condition on line 49 was always true
50 stream.propertyPackageType = neighbour_stream.propertyPackageType
51 stream.save()
54def update_compounds_on_merge(inlet_stream: "SimulationObject", outlet_stream: "SimulationObject") -> None:
55 """
56 Handles compound propogation when two streams are merged.
57 """
58 expected_keys = _get_compound_keys(outlet_stream)
59 other_expected_keys = _get_compound_keys(inlet_stream)
60 # combine the lists, removing duplicates
61 expected_keys.extend(other_expected_keys)
62 expected_keys = list(set(expected_keys))
63 propogator = CompoundPropogation(inlet_stream, expected_keys)
64 propogator.run(include_source=False)
66 from flowsheetInternals.unitops.models.property_package_propogation import propogate_property_package
67 inlet_stream.propertyPackageType = outlet_stream.propertyPackageType
68 propogate_property_package(inlet_stream)
71def _get_compound_keys(stream: "SimulationObject") -> list[str]:
72 indexed_items = IndexedItem.objects.filter(owner=stream, type=IndexChoices.Compound)
73 return [item.key for item in indexed_items]
75# "and" should never be in a function name,
76# because one function should do one thing.
77# TODO: Split this into two seperate functions.
78def update_decision_node_and_propagate(decisionNode: "SimulationObject", updated_via_right_click: bool = False):
79 """
80 Look through all the decision node's inlet streams to find a sum of the compounds
81 Then call propagate for every outlet stream of the decision node
82 """
83 compound_key_set = set()
85 for port in decisionNode.ports.all():
86 if port.stream:
87 compound_key_set.update(_get_compound_keys(port.stream))
89 # Create propagator with all compounds
90 compound_keys = list(compound_key_set)
92 # Update the decision node itself
93 propagator = CompoundPropogation(decisionNode, compound_keys)
94 propagator.run()
97def run_for_stream(stream: "SimulationObject", expected_compounds: list[str]) -> None:
98 """
99 Runs the compound propogation for a specific stream (doesn't propogate to other streams).
100 """
101 propagator = CompoundPropogation(stream, expected_compounds)
102 propagator.run_for_stream(stream)
105class StreamsToUpdateItem(TypedDict):
106 add: list[str]
107 remove: list[str]
110class CompoundPropogation:
111 """
112 This class handles the addition and removal of chemical compounds in process streams.
113 It manages how compounds propagate through connected equipment (like pumps, mixers, etc.)
114 """
115 def __init__(self, stream: "SimulationObject", expected_compounds: list[str]) -> None:
116 self.source_stream: "SimulationObject" = stream
117 self.expected_compounds: list[str] = expected_compounds
118 self.create_property_values = []
119 self.create_indexed_items = []
120 self.create_property_value_indexed_items = []
121 self.delete_property_values = []
122 self.delete_indexed_items = []
124 def _update_compounds(self, unitop: "SimulationObject") -> None:
125 """
126 Updates the compounds in an object, deleting any that are no longer present
127 and adding any that are now present.
128 """
129 new_indexed_items = []
130 if "compound" in unitop.schema.indexSets:
131 # Remove any compounds that are not in the list of expected compounds
132 compound_indexed_items = IndexedItem.objects.filter(owner=unitop, type=IndexChoices.Compound)
133 indexed_items_to_remove = compound_indexed_items.exclude(key__in=self.expected_compounds)
134 indexed_items_to_keep = compound_indexed_items.filter(key__in=self.expected_compounds)
136 property_values_to_remove = PropertyValue.objects.filter(indexedItems__in=indexed_items_to_remove)
137 property_values_to_remove.delete()
138 indexed_items_to_remove.delete()
140 present_compounds: list[str] = indexed_items_to_keep.values_list('key', flat=True)
141 for compound in self.expected_compounds:
142 if compound not in present_compounds:
143 # If the compound is not already present, create a new indexed item for it
145 new_index = IndexedItem(owner=unitop, key=compound, displayName=compound, type=IndexChoices.Compound, flowsheet=unitop.flowsheet)
146 new_indexed_items.append(new_index) # List of new indexed items for this unit op
147 self.create_indexed_items.append(new_index) # List of all new indexed items for all unit ops
149 # Create property info
150 propertySet = unitop.properties
152 # Update compound-dependent properties ( e.g compound separator split fraction)
153 for property in propertySet.ContainedProperties.all ():
154 if property.key not in unitop.schema.properties: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true
155 continue # This is a custom property so we don't have to worry about adding compounds to it.
156 property_schema = unitop.schema.properties[property.key]
157 index_sets = property_schema.indexSets
158 if index_sets is not None and "compound" in index_sets:
159 self._property_add_remove(unitop,property,new_indexed_items)
161 # Note that this does not set the enabled status correctly, so we need to reevaluate_properties_enabled later..
164 def _property_add_remove(self, unitop:"SimulationObject", propertyInfo: "PropertyInfo",new_indexed_items: list[IndexedItem]) -> None:
165 """
166 Adds or removes the indexes and property values from the propertyInfo
167 """
168 # get the other indexes for this property
169 schema = unitop.schema.properties[propertyInfo.key]
170 group = schema.propertySetGroup
171 enabled = propertyInfo.key in unitop.schema.propertySetGroups[group].stateVars
172 other_indexes = IndexedItem.objects.filter(
173 owner=unitop, type__in=schema.indexSets
174 ).exclude(type=IndexChoices.Compound)
176 # The propertyValues to delete should already have been deleted by now.
178 indexes = [list(other_indexes), new_indexed_items]
179 combinations = list(itertools.product(*indexes))
180 property_values = []
182 if schema.sumToOne:
183 # Leave the last combination as disabled, as they can be calculated from the other compounds.
184 cutoff = len(combinations) - 1 - len(self.create_indexed_items)
185 else:
186 # all combinations should be enabled
187 cutoff = len(combinations)
188 for index, idxes in enumerate(combinations):
189 if index <= cutoff:
190 property_value = PropertyValue(value=schema.value, property=propertyInfo, enabled=enabled, flowsheet=unitop.flowsheet)
191 else:
192 property_value = PropertyValue(value=schema.value, property=propertyInfo, enabled=False, flowsheet=unitop.flowsheet)
193 for indexed_item in idxes:
194 self.create_property_value_indexed_items.append(
195 PropertyValueIntermediate(propertyvalue=property_value, indexeditem=indexed_item)
196 )
197 property_values.append(property_value)
198 if len(combinations) == 0:
199 # There are no other indexes other than for this compound. (e.g mole_frac_comp)
200 # just create a property value for each indexed item
201 for indexed_item in new_indexed_items:
202 property_value = PropertyValue(value=schema.value, property=propertyInfo, enabled=enabled, flowsheet=unitop.flowsheet)
203 property_values.append(property_value)
204 self.create_property_value_indexed_items.append(
205 PropertyValueIntermediate(propertyvalue=property_value, indexeditem=indexed_item)
206 )
208 if propertyInfo.key == "mole_frac_comp" and len(self.expected_compounds) == 1:
209 # If this is a mole_frac_comp property and there is only one compound, set the value to 1.0
210 for property_value in property_values:
211 property_value.value = 1.0
212 property_value.displayValue = 1.0
214 self.create_property_values.extend(property_values)
217 def run(self, include_source: bool = True) -> None:
218 """
219 Main method that orchestrates the entire compound update process:
220 1. Identifies all affected streams
221 2. Prepares database changes (additions and removals)
222 3. Updates the database in bulk for better performance
223 4. Handles read/write permissions for properties
225 Args:
226 - include_source: Whether to include the source stream in the update
227 """
229 unit_ops, streams = track_stream_flow(self.source_stream)
231 for unit_op in unit_ops:
232 self._update_compounds(unit_op)
234 for current_stream in streams:
235 self._update_compounds(current_stream)
237 # Perform database changes
238 self._handle_database_changes()
240 objects = streams | unit_ops
241 for simulation_object in objects:
242 simulation_object.reevaluate_properties_enabled()
244 def run_for_stream(self, stream: "SimulationObject") -> None:
245 """
246 Runs the compound propogation only for a specific stream (doesn't propogate to other streams).
247 """
248 self._update_compounds(stream)
249 self._handle_database_changes()
250 stream.reevaluate_properties_enabled()
252 def _handle_database_changes(self) -> None: # First
253 """
254 Handles the database changes for the compound propogation based on the streams_to_update dictionary.
255 """
257 # Perform bulk operations
258 if self.delete_property_values: 258 ↛ 259line 258 didn't jump to line 259 because the condition on line 258 was never true
259 PropertyValue.objects.filter(id__in=self.delete_property_values).delete()
260 IndexedItem.objects.filter(id__in=self.delete_indexed_items).delete()
261 if self.create_property_values:
262 PropertyValue.objects.bulk_create(self.create_property_values)
263 IndexedItem.objects.bulk_create(self.create_indexed_items)
264 PropertyValueIntermediate.objects.bulk_create(self.create_property_value_indexed_items)