Coverage for backend/django/PinchAnalysis/models/next_stream_data_entry.py: 85%
58 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-12-18 04:00 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-12-18 04:00 +0000
1from collections import deque
2from typing import Iterable, List, Set
4from flowsheetInternals.unitops.models import Port, SimulationObject
5from core.auxiliary.enums.unitOpGraphics import ConType
6from flowsheetInternals.unitops.config.config_methods import get_connected_port_keys
7from core.auxiliary.enums.unitOpData import SimulationObjectClass
10def same_flow_ports(port):
11 u = port.unitOp
12 u_id = u.id
13 keys = get_connected_port_keys(port.key, u.schema)
14 return Port.objects.filter(unitOp_id=u_id, key__in=keys).exclude(id=port.id)
16def build_inlet_index(sdes: Iterable) -> dict[int, object]:
17 """
18 Fast lookup: stream_id -> StreamDataEntry that uses that stream as INLET.
19 """
20 index = {}
21 for sde in sdes:
22 inlet_stream, _ = sde.inlet_outlet_stream # property returns (inlet, outlet)
23 if inlet_stream is not None: 23 ↛ 21line 23 didn't jump to line 21 because the condition on line 23 was always true
24 index[inlet_stream.id] = sde
25 return index
28def iter_downstream_streams(stream):
29 """
30 Yield streams that are strictly *downstream* of the given stream:
31 - only cross into unit ops where this stream attaches on an INLET port
32 - then exit those unit ops via their OUTLET ports
33 - skip Translators (PP boundary)
34 """
35 # Fresh ports for this stream, every time
36 for port in Port.objects.filter(stream_id=stream.id).select_related("unitOp"):
37 u = port.unitOp
39 # Don't cross property-package boundaries
40 if u.objectType == SimulationObjectClass.Translator: 40 ↛ 41line 40 didn't jump to line 41 because the condition on line 40 was never true
41 continue
43 # Only move forward: this stream must be entering the unit op
44 if port.direction != ConType.Inlet:
45 continue
47 # Cross the unit op to same-flow ports and exit via OUTLET(s)
48 for p2 in same_flow_ports(port):
49 if p2.direction != ConType.Outlet: 49 ↛ 50line 49 didn't jump to line 50 because the condition on line 49 was never true
50 continue
51 # Use id to guard; the FK may be null if unconnected
52 if p2.stream_id: 52 ↛ 48line 52 didn't jump to line 48 because the condition on line 52 was always true
53 # pull fresh stream instance
54 yield SimulationObject.objects.get(pk=p2.stream_id)
57def next_sdes_for(sde, all_sdes: Iterable = None) -> List:
58 """
59 Given one StreamDataEntry, find the immediate downstream StreamDataEntries.
60 Stops at Translators and at the first SDE inlet on each branch.
61 """
62 # Refresh SDE and its unit op to avoid stale related caches
63 sde = type(sde).objects.select_related("unitop").get(pk=sde.pk)
65 if all_sdes is None: 65 ↛ 71line 65 didn't jump to line 71 because the condition on line 65 was always true
66 all_sdes = (type(sde).objects
67 .filter(flowsheet=sde.flowsheet)
68 .select_related("unitop"))
70 # pull fresh inlet/outlet streams; bail out cleanly if missing
71 try:
72 inlet_stream, outlet_stream = sde.inlet_outlet_stream
73 except Exception:
74 return []
76 if outlet_stream is None: 76 ↛ 77line 76 didn't jump to line 77 because the condition on line 76 was never true
77 return []
79 inlet_index = build_inlet_index(all_sdes)
81 # Standard BFS
82 seen_stream_ids: Set[int] = {outlet_stream.id}
83 q = deque([outlet_stream])
84 results: List = []
85 seen_result_ids: Set[int] = set()
87 while q:
88 current = q.popleft()
90 # If this stream is the inlet of another SDE, record once and cut this branch
91 hit = inlet_index.get(current.id)
92 if hit and hit.id != sde.id:
93 if hit.id not in seen_result_ids: 93 ↛ 96line 93 didn't jump to line 96 because the condition on line 93 was always true
94 results.append(hit)
95 seen_result_ids.add(hit.id)
96 continue
98 # Otherwise, move strictly downstream
99 for nxt in iter_downstream_streams(current):
100 if nxt.id not in seen_stream_ids: 100 ↛ 99line 100 didn't jump to line 99 because the condition on line 100 was always true
101 seen_stream_ids.add(nxt.id)
102 q.append(nxt)
104 return results