Coverage for backend/django/PinchAnalysis/models/next_stream_data_entry.py: 85%

58 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-12-18 04:00 +0000

1from collections import deque 

2from typing import Iterable, List, Set 

3 

4from flowsheetInternals.unitops.models import Port, SimulationObject 

5from core.auxiliary.enums.unitOpGraphics import ConType 

6from flowsheetInternals.unitops.config.config_methods import get_connected_port_keys 

7from core.auxiliary.enums.unitOpData import SimulationObjectClass 

8 

9 

10def same_flow_ports(port): 

11 u = port.unitOp 

12 u_id = u.id 

13 keys = get_connected_port_keys(port.key, u.schema) 

14 return Port.objects.filter(unitOp_id=u_id, key__in=keys).exclude(id=port.id) 

15 

16def build_inlet_index(sdes: Iterable) -> dict[int, object]: 

17 """ 

18 Fast lookup: stream_id -> StreamDataEntry that uses that stream as INLET. 

19 """ 

20 index = {} 

21 for sde in sdes: 

22 inlet_stream, _ = sde.inlet_outlet_stream # property returns (inlet, outlet) 

23 if inlet_stream is not None: 23 ↛ 21line 23 didn't jump to line 21 because the condition on line 23 was always true

24 index[inlet_stream.id] = sde 

25 return index 

26 

27 

28def iter_downstream_streams(stream): 

29 """ 

30 Yield streams that are strictly *downstream* of the given stream: 

31 - only cross into unit ops where this stream attaches on an INLET port 

32 - then exit those unit ops via their OUTLET ports 

33 - skip Translators (PP boundary) 

34 """ 

35 # Fresh ports for this stream, every time 

36 for port in Port.objects.filter(stream_id=stream.id).select_related("unitOp"): 

37 u = port.unitOp 

38 

39 # Don't cross property-package boundaries 

40 if u.objectType == SimulationObjectClass.Translator: 40 ↛ 41line 40 didn't jump to line 41 because the condition on line 40 was never true

41 continue 

42 

43 # Only move forward: this stream must be entering the unit op 

44 if port.direction != ConType.Inlet: 

45 continue 

46 

47 # Cross the unit op to same-flow ports and exit via OUTLET(s) 

48 for p2 in same_flow_ports(port): 

49 if p2.direction != ConType.Outlet: 49 ↛ 50line 49 didn't jump to line 50 because the condition on line 49 was never true

50 continue 

51 # Use id to guard; the FK may be null if unconnected 

52 if p2.stream_id: 52 ↛ 48line 52 didn't jump to line 48 because the condition on line 52 was always true

53 # pull fresh stream instance 

54 yield SimulationObject.objects.get(pk=p2.stream_id) 

55 

56 

57def next_sdes_for(sde, all_sdes: Iterable = None) -> List: 

58 """ 

59 Given one StreamDataEntry, find the immediate downstream StreamDataEntries. 

60 Stops at Translators and at the first SDE inlet on each branch. 

61 """ 

62 # Refresh SDE and its unit op to avoid stale related caches 

63 sde = type(sde).objects.select_related("unitop").get(pk=sde.pk) 

64 

65 if all_sdes is None: 65 ↛ 71line 65 didn't jump to line 71 because the condition on line 65 was always true

66 all_sdes = (type(sde).objects 

67 .filter(flowsheet=sde.flowsheet) 

68 .select_related("unitop")) 

69 

70 # pull fresh inlet/outlet streams; bail out cleanly if missing 

71 try: 

72 inlet_stream, outlet_stream = sde.inlet_outlet_stream 

73 except Exception: 

74 return [] 

75 

76 if outlet_stream is None: 76 ↛ 77line 76 didn't jump to line 77 because the condition on line 76 was never true

77 return [] 

78 

79 inlet_index = build_inlet_index(all_sdes) 

80 

81 # Standard BFS 

82 seen_stream_ids: Set[int] = {outlet_stream.id} 

83 q = deque([outlet_stream]) 

84 results: List = [] 

85 seen_result_ids: Set[int] = set() 

86 

87 while q: 

88 current = q.popleft() 

89 

90 # If this stream is the inlet of another SDE, record once and cut this branch 

91 hit = inlet_index.get(current.id) 

92 if hit and hit.id != sde.id: 

93 if hit.id not in seen_result_ids: 93 ↛ 96line 93 didn't jump to line 96 because the condition on line 93 was always true

94 results.append(hit) 

95 seen_result_ids.add(hit.id) 

96 continue 

97 

98 # Otherwise, move strictly downstream 

99 for nxt in iter_downstream_streams(current): 

100 if nxt.id not in seen_stream_ids: 100 ↛ 99line 100 didn't jump to line 99 because the condition on line 100 was always true

101 seen_stream_ids.add(nxt.id) 

102 q.append(nxt) 

103 

104 return results