Coverage for backend/django/flowsheetInternals/unitops/models/compound_propogation.py: 97%
132 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
1import itertools
2from typing import TYPE_CHECKING, Literal, TypedDict
4from core.auxiliary.models.IndexedItem import IndexedItem, IndexChoices
5from core.auxiliary.models.PropertyValue import PropertyValue, PropertyValueIntermediate
6from core.auxiliary.models.PropertyInfo import PropertyInfo
7from flowsheetInternals.unitops.models.flow_tracking import track_stream_flow
8from flowsheetInternals.unitops.config.config_methods import get_connected_port_keys
9if TYPE_CHECKING:
10 from flowsheetInternals.unitops.models.SimulationObject import SimulationObject
11 from flowsheetInternals.unitops.models.Port import Port
15def update_compounds_on_set(stream: "SimulationObject", expected_keys: list[str]) -> None:
16 """
17 Directly sets the compounds in a stream to match the expected compounds.
18 """
19 # Preserve the first-seen order while ignoring duplicate compound keys.
20 unique_expected_keys = list(dict.fromkeys(expected_keys))
21 propagator = CompoundPropogation(stream, unique_expected_keys)
22 propagator.run()
25def update_compounds_on_add_stream(port: "Port", stream: "SimulationObject") -> None:
26 """
27 Handles compound propogation when a stream is added to a port.
28 """
29 # no mole_frac_comp -> probably a bus
30 if not stream.properties.ContainedProperties.filter(key="mole_frac_comp").exists():
31 return
32 unitop = port.unitOp
35 other_port_keys = get_connected_port_keys(port.key, unitop.schema)
37 other_ports = unitop.ports.filter(
38 key__in=other_port_keys,
39 )
40 streams = [port.stream for port in other_ports if port.stream]
41 unique_compounds = set()
42 for neighbour_stream in streams:
43 unique_compounds.update(_get_compound_keys(neighbour_stream))
46 propagator = CompoundPropogation(stream, unique_compounds)
47 propagator.run_for_stream(stream)
49 if neighbour_stream is not None: 49 ↛ exitline 49 didn't return from function 'update_compounds_on_add_stream' because the condition on line 49 was always true
50 stream.propertyPackageType = neighbour_stream.propertyPackageType
51 stream.save()
54def update_compounds_on_merge(inlet_stream: "SimulationObject", outlet_stream: "SimulationObject") -> None:
55 """
56 Handles compound propogation when two streams are merged.
57 """
58 expected_keys = _get_compound_keys(outlet_stream)
59 other_expected_keys = _get_compound_keys(inlet_stream)
60 # combine the lists, removing duplicates
61 expected_keys.extend(other_expected_keys)
62 expected_keys = list(set(expected_keys))
63 propogator = CompoundPropogation(inlet_stream, expected_keys)
64 propogator.run(include_source=False)
66 from flowsheetInternals.unitops.models.property_package_propogation import propogate_property_package
67 inlet_stream.propertyPackageType = outlet_stream.propertyPackageType
68 propogate_property_package(inlet_stream)
71def _get_compound_keys(stream: "SimulationObject") -> list[str]:
72 indexed_items = IndexedItem.objects.filter(owner=stream, type=IndexChoices.Compound)
73 return [item.key for item in indexed_items]
75# "and" should never be in a function name,
76# because one function should do one thing.
77# TODO: Split this into two seperate functions.
78def update_decision_node_and_propagate(decisionNode: "SimulationObject", updated_via_right_click: bool = False):
79 """
80 Look through all the decision node's inlet streams to find a sum of the compounds
81 Then call propagate for every outlet stream of the decision node
82 """
83 compound_key_set = set()
85 for port in decisionNode.ports.all():
86 if port.stream:
87 compound_key_set.update(_get_compound_keys(port.stream))
89 # Create propagator with all compounds
90 compound_keys = list(compound_key_set)
92 # Update the decision node itself
93 propagator = CompoundPropogation(decisionNode, compound_keys)
94 propagator.run()
97def run_for_stream(stream: "SimulationObject", expected_compounds: list[str]) -> None:
98 """
99 Runs the compound propogation for a specific stream (doesn't propogate to other streams).
100 """
101 propagator = CompoundPropogation(stream, expected_compounds)
102 propagator.run_for_stream(stream)
105class StreamsToUpdateItem(TypedDict):
106 add: list[str]
107 remove: list[str]
110class CompoundPropogation:
111 """
112 This class handles the addition and removal of chemical compounds in process streams.
113 It manages how compounds propagate through connected equipment (like pumps, mixers, etc.)
114 """
115 def __init__(self, stream: "SimulationObject", expected_compounds: list[str]) -> None:
116 self.source_stream: "SimulationObject" = stream
117 self.expected_compounds: list[str] = expected_compounds
118 self.create_property_values = []
119 self.create_indexed_items = []
120 self.create_property_value_indexed_items = []
121 self.delete_property_values = []
122 self.delete_indexed_items = []
124 def _update_compounds(self, unitop: "SimulationObject") -> None:
125 """
126 Updates the compounds in an object, deleting any that are no longer present
127 and adding any that are now present.
128 """
129 new_indexed_items = []
130 if "compound" in unitop.schema.indexSets:
131 # Remove any compounds that are not in the list of expected compounds
132 compound_indexed_items = IndexedItem.objects.filter(owner=unitop, type=IndexChoices.Compound)
133 indexed_items_to_remove = compound_indexed_items.exclude(key__in=self.expected_compounds)
134 indexed_items_to_keep = compound_indexed_items.filter(key__in=self.expected_compounds)
136 property_values_to_remove = PropertyValue.objects.filter(indexedItems__in=indexed_items_to_remove)
137 property_values_to_remove.delete()
138 indexed_items_to_remove.delete()
140 present_compounds: list[str] = indexed_items_to_keep.values_list('key', flat=True)
141 for compound in self.expected_compounds:
142 if compound not in present_compounds:
143 # If the compound is not already present, create a new indexed item for it
145 new_index = IndexedItem(owner=unitop, key=compound, displayName=compound, type=IndexChoices.Compound, flowsheet=unitop.flowsheet)
146 new_indexed_items.append(new_index) # List of new indexed items for this unit op
147 self.create_indexed_items.append(new_index) # List of all new indexed items for all unit ops
149 # Create property info
150 propertySet = unitop.properties
152 # Update compound-dependent properties ( e.g compound separator split fraction)
153 for property in propertySet.ContainedProperties.all ():
154 index_sets = unitop.schema.properties[property.key].indexSets
156 if index_sets is not None and "compound" in unitop.schema.properties[property.key].indexSets:
157 self._property_add_remove(unitop,property,new_indexed_items)
159 # Note that this does not set the enabled status correctly, so we need to reevaluate_properties_enabled later..
162 def _property_add_remove(self, unitop:"SimulationObject", propertyInfo: "PropertyInfo",new_indexed_items: list[IndexedItem]) -> None:
163 """
164 Adds or removes the indexes and property values from the propertyInfo
165 """
166 # get the other indexes for this property
167 schema = unitop.schema.properties[propertyInfo.key]
168 group = schema.propertySetGroup
169 enabled = propertyInfo.key in unitop.schema.propertySetGroups[group].stateVars
170 other_indexes = IndexedItem.objects.filter(
171 owner=unitop, type__in=schema.indexSets
172 ).exclude(type=IndexChoices.Compound)
174 # The propertyValues to delete should already have been deleted by now.
176 indexes = [list(other_indexes), new_indexed_items]
177 combinations = list(itertools.product(*indexes))
178 property_values = []
180 if schema.sumToOne:
181 # Leave the last combination as disabled, as they can be calculated from the other compounds.
182 cutoff = len(combinations) - 1 - len(self.create_indexed_items)
183 else:
184 # all combinations should be enabled
185 cutoff = len(combinations)
186 for index, idxes in enumerate(combinations):
187 if index <= cutoff:
188 property_value = PropertyValue(value=schema.value, property=propertyInfo, enabled=enabled, flowsheet=unitop.flowsheet)
189 else:
190 property_value = PropertyValue(value=schema.value, property=propertyInfo, enabled=False, flowsheet=unitop.flowsheet)
191 for indexed_item in idxes:
192 self.create_property_value_indexed_items.append(
193 PropertyValueIntermediate(propertyvalue=property_value, indexeditem=indexed_item)
194 )
195 property_values.append(property_value)
196 if len(combinations) == 0:
197 # There are no other indexes other than for this compound. (e.g mole_frac_comp)
198 # just create a property value for each indexed item
199 for indexed_item in new_indexed_items:
200 property_value = PropertyValue(value=schema.value, property=propertyInfo, enabled=enabled, flowsheet=unitop.flowsheet)
201 property_values.append(property_value)
202 self.create_property_value_indexed_items.append(
203 PropertyValueIntermediate(propertyvalue=property_value, indexeditem=indexed_item)
204 )
206 if propertyInfo.key == "mole_frac_comp" and len(self.expected_compounds) == 1:
207 # If this is a mole_frac_comp property and there is only one compound, set the value to 1.0
208 for property_value in property_values:
209 property_value.value = 1.0
210 property_value.displayValue = 1.0
212 self.create_property_values.extend(property_values)
215 def run(self, include_source: bool = True) -> None:
216 """
217 Main method that orchestrates the entire compound update process:
218 1. Identifies all affected streams
219 2. Prepares database changes (additions and removals)
220 3. Updates the database in bulk for better performance
221 4. Handles read/write permissions for properties
223 Args:
224 - include_source: Whether to include the source stream in the update
225 """
227 unit_ops, streams = track_stream_flow(self.source_stream)
229 for unit_op in unit_ops:
230 self._update_compounds(unit_op)
232 for current_stream in streams:
233 self._update_compounds(current_stream)
235 # Perform database changes
236 self._handle_database_changes()
238 objects = streams | unit_ops
239 for simulation_object in objects:
240 simulation_object.reevaluate_properties_enabled()
242 def run_for_stream(self, stream: "SimulationObject") -> None:
243 """
244 Runs the compound propogation only for a specific stream (doesn't propogate to other streams).
245 """
246 self._update_compounds(stream)
247 self._handle_database_changes()
248 stream.reevaluate_properties_enabled()
250 def _handle_database_changes(self) -> None: # First
251 """
252 Handles the database changes for the compound propogation based on the streams_to_update dictionary.
253 """
255 # Perform bulk operations
256 if self.delete_property_values: 256 ↛ 257line 256 didn't jump to line 257 because the condition on line 256 was never true
257 PropertyValue.objects.filter(id__in=self.delete_property_values).delete()
258 IndexedItem.objects.filter(id__in=self.delete_indexed_items).delete()
259 if self.create_property_values:
260 PropertyValue.objects.bulk_create(self.create_property_values)
261 IndexedItem.objects.bulk_create(self.create_indexed_items)
262 PropertyValueIntermediate.objects.bulk_create(self.create_property_value_indexed_items)