Coverage for backend/flowsheetInternals/unitops/models/flow_tracking.py: 100%
35 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
1from flowsheetInternals.unitops.config.config_methods import get_connected_port_keys
2from core.auxiliary.enums.unitOpGraphics import ConType
3from core.auxiliary.enums.unitOpData import SimulationObjectClass
4from typing import TYPE_CHECKING, Literal, TypedDict
6if TYPE_CHECKING:
7 from flowsheetInternals.unitops.models.SimulationObject import SimulationObject
8 from flowsheetInternals.unitops.models.Port import Port
12# test_compound_population provides good coverage of this code.
15def track_stream_flow(stream: "SimulationObject") -> tuple[set["SimulationObject"], set["SimulationObject"]]:
16 """
17 Tracks the flow of a stream through the system.
18 Stops at translator blocks, or recycles, where the property package has to be re-defined.
19 Arguments:
20 Stream: The stream simulation object to start from. Can also be a decision node.
21 Returns:
22 A tuple containing:
23 - A list of all unit operations that the fluid goes through
24 - A list of all streams that have properties of the fluid,
26 This is done by iterating down through the outlets of the stream and the unit ops of the stream.
27 The Unit Operations in their config file have a propertyPackagePorts parameter that helps with
28 figuring out which ports
29 """
30 unit_ops = set()
31 streams = set()
33 remaining_streams = [stream]
36 #If we're starting with a decision node, get all the streams in its outlet.
37 if stream.objectType == SimulationObjectClass.DecisionNode:
38 # it's not actually a stream lol
39 decision_node = stream
40 remaining_streams = [] # remove it from the list of streams
41 unit_ops.add(decision_node)
42 # add its outlets instead.
43 for port in decision_node.ports.filter(direction=ConType.Outlet):
44 if port.stream is not None and port.stream not in streams:
45 # If the port has a stream, add it to the remaining streams.
46 remaining_streams.append(port.stream)
48 while remaining_streams:
49 current_stream = remaining_streams.pop(0)
50 streams.add(current_stream)
51 stream_ports = current_stream.connectedPorts
52 port: Port
53 for port in stream_ports.all():
54 unit_op: "SimulationObject" = port.unitOp
56 # If it's a translator, we stop tracking flow (separate property package and compounds on other side.)
57 if unit_op.objectType == SimulationObjectClass.Translator:
58 continue
60 unit_ops.add(unit_op)
62 # Recursively visit any other streams that we haven't already visited.
63 ports_to_visit = get_connected_ports(port)
64 for port in ports_to_visit:
65 stream = port.stream
66 if stream is not None and stream not in streams and stream not in remaining_streams:
67 remaining_streams.append(stream)
69 return unit_ops, streams
72def get_connected_ports(port: "Port"):
73 """
74 From this port, find any other ports that are also connected to this unit operation,
75 and represent the same "flow".
76 """
77 unit_op: "SimulationObject" = port.unitOp
78 connected_port_keys = get_connected_port_keys(port.key, unit_op.schema)
80 ports_to_visit = unit_op.ports.filter(
81 key__in=connected_port_keys
82 )
83 return ports_to_visit