Coverage for backend/django/flowsheetInternals/graphicData/services/auto_sort.py: 89%

116 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-06-23 21:51 +0000

1from dataclasses import dataclass 

2import shlex 

3from typing import Iterable 

4from django.db import transaction 

5from django.db.models import Q 

6import pydot 

7 

8from core.auxiliary.enums import ConType 

9from flowsheetInternals.graphicData.models.graphicObjectModel import GraphicObject 

10from flowsheetInternals.unitops.models.Port import Port 

11 

12 

13GRAPHVIZ_SCALE = 40 

14RECYCLE_VERTICAL_OFFSET = 80 

15RECYCLE_STACK_OFFSET = 40 

16 

17 

18@dataclass(frozen=True) 

19class Position: 

20 x: float 

21 y: float 

22 

23 

24def _graphic_sort_key(graphic_object: GraphicObject) -> tuple[float, float, int]: 

25 return ( 

26 float(graphic_object.y or 0), 

27 float(graphic_object.x or 0), 

28 graphic_object.simulationObject_id or 0, 

29 ) 

30 

31 

32def _get_recycle_tear_object_id(graphic_object: GraphicObject) -> int | None: 

33 simulation_object = getattr(graphic_object, "simulationObject", None) 

34 if getattr(simulation_object, "objectType", None) != "recycle": 

35 return None 

36 

37 try: 

38 recycle_data = simulation_object.recycleData 

39 except AttributeError: 

40 return None 

41 

42 return getattr(recycle_data, "tearObject_id", None) 

43 

44 

45def _split_recycles( 

46 graphic_objects: list[GraphicObject], 

47) -> tuple[list[GraphicObject], dict[int, list[int]]]: 

48 """ 

49 Connected recycle blocks are annotations on a tear stream, so place them 

50 relative to that stream instead of letting them influence the graph layout. 

51 """ 

52 visible_object_ids = {graphic.simulationObject_id for graphic in graphic_objects} 

53 graphviz_graphics = [] 

54 recycle_ids_by_tear_object_id = {} 

55 

56 for graphic in graphic_objects: 

57 tear_object_id = _get_recycle_tear_object_id(graphic) 

58 if tear_object_id is None or tear_object_id not in visible_object_ids: 

59 graphviz_graphics.append(graphic) 

60 continue 

61 

62 recycle_ids_by_tear_object_id.setdefault(tear_object_id, []).append( 

63 graphic.simulationObject_id 

64 ) 

65 

66 return graphviz_graphics, recycle_ids_by_tear_object_id 

67 

68 

69def _iter_connection_edges( 

70 ports: Iterable[Port], visible_object_ids: set[int] 

71) -> Iterable[tuple[int, int]]: 

72 for port in ports: 

73 if ( 

74 port.unitOp_id not in visible_object_ids 

75 or port.stream_id not in visible_object_ids 

76 ): 

77 continue 

78 

79 if port.direction == ConType.Outlet: 

80 yield port.unitOp_id, port.stream_id 

81 elif port.direction == ConType.Inlet: 81 ↛ 72line 81 didn't jump to line 72 because the condition on line 81 was always true

82 yield port.stream_id, port.unitOp_id 

83 

84 

85def _build_graph( 

86 graphic_objects: list[GraphicObject], 

87 ports: Iterable[Port], 

88) -> pydot.Dot: 

89 visible_object_ids = {graphic.simulationObject_id for graphic in graphic_objects} 

90 graph = pydot.Dot( 

91 graph_type="digraph", 

92 rankdir="LR", 

93 ranksep="4.0 equally", 

94 nodesep="2.0", 

95 ) 

96 graph.set_node_defaults( 

97 shape="box", 

98 fixedsize="true", 

99 width="1.5", 

100 height="1.0", 

101 ) 

102 graph.set_edge_defaults(weight="2") 

103 

104 for sort_index, graphic in enumerate(graphic_objects): 

105 object_id = str(graphic.simulationObject_id) 

106 graph.add_node( 

107 pydot.Node( 

108 object_id, 

109 label=object_id, 

110 sortv=str(sort_index), 

111 ) 

112 ) 

113 

114 for source, target in _iter_connection_edges(ports, visible_object_ids): 

115 graph.add_edge(pydot.Edge(str(source), str(target))) 

116 

117 return graph 

118 

119 

120def _parse_plain_positions(plain_output: str) -> dict[int, Position]: 

121 positions = {} 

122 for line in plain_output.splitlines(): 

123 parts = shlex.split(line) 

124 if not parts or parts[0] != "node": 

125 continue 

126 

127 object_id = int(parts[1]) 

128 positions[object_id] = Position( 

129 x=float(parts[2]) * GRAPHVIZ_SCALE, 

130 y=float(parts[3]) * GRAPHVIZ_SCALE, 

131 ) 

132 

133 return positions 

134 

135 

136def _layout_graph(graph: pydot.Dot) -> dict[int, Position]: 

137 plain_output = graph.create(format="plain", prog="dot").decode("utf-8") 

138 return _parse_plain_positions(plain_output) 

139 

140 

141def compute_auto_sort_positions( 

142 graphic_objects: Iterable[GraphicObject], 

143 ports: Iterable[Port], 

144) -> dict[int, Position]: 

145 sorted_graphics = sorted( 

146 [ 

147 graphic 

148 for graphic in graphic_objects 

149 if graphic.simulationObject_id is not None 

150 ], 

151 key=_graphic_sort_key, 

152 ) 

153 if not sorted_graphics: 153 ↛ 154line 153 didn't jump to line 154 because the condition on line 153 was never true

154 return {} 

155 

156 graphviz_graphics, recycle_ids_by_tear_object_id = _split_recycles(sorted_graphics) 

157 if not graphviz_graphics: 157 ↛ 158line 157 didn't jump to line 158 because the condition on line 157 was never true

158 return {} 

159 

160 graphviz_positions = _layout_graph(_build_graph(graphviz_graphics, ports)) 

161 if not graphviz_positions: 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true

162 return {} 

163 

164 min_graphviz_x = min(position.x for position in graphviz_positions.values()) 

165 max_graphviz_y = max(position.y for position in graphviz_positions.values()) 

166 anchor_x = min(float(graphic.x or 0) for graphic in sorted_graphics) 

167 anchor_y = min(float(graphic.y or 0) for graphic in sorted_graphics) 

168 

169 positions = { 

170 object_id: Position( 

171 x=round(anchor_x + position.x - min_graphviz_x, 2), 

172 y=round(anchor_y + max_graphviz_y - position.y, 2), 

173 ) 

174 for object_id, position in graphviz_positions.items() 

175 } 

176 

177 for tear_object_id, recycle_object_ids in recycle_ids_by_tear_object_id.items(): 

178 tear_position = positions.get(tear_object_id) 

179 if tear_position is None: 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was never true

180 continue 

181 

182 for stack_index, recycle_object_id in enumerate(sorted(recycle_object_ids)): 

183 positions[recycle_object_id] = Position( 

184 x=tear_position.x, 

185 y=tear_position.y - RECYCLE_VERTICAL_OFFSET - stack_index * RECYCLE_STACK_OFFSET, 

186 ) 

187 

188 return positions 

189 

190 

191def auto_sort(flowsheet_id: int, group_id: int): 

192 graphic_objects = list( 

193 GraphicObject.objects.filter( 

194 group_id=group_id, 

195 visible=True, 

196 simulationObject__is_deleted=False, 

197 ) 

198 .select_related("simulationObject", "simulationObject__recycleData") 

199 .only( 

200 "id", 

201 "x", 

202 "y", 

203 "group_id", 

204 "simulationObject_id", 

205 "simulationObject__id", 

206 "simulationObject__objectType", 

207 "simulationObject__recycleData__id", 

208 "simulationObject__recycleData__tearObject_id", 

209 ) 

210 ) 

211 object_ids = { 

212 graphic_object.simulationObject_id 

213 for graphic_object in graphic_objects 

214 if graphic_object.simulationObject_id is not None 

215 } 

216 

217 ports = Port.objects.none() 

218 if object_ids: 218 ↛ 224line 218 didn't jump to line 224 because the condition on line 218 was always true

219 ports = Port.objects.filter( 

220 Q(unitOp_id__in=object_ids) | Q(stream_id__in=object_ids), 

221 flowsheet_id=flowsheet_id, 

222 ).only("id", "direction", "unitOp_id", "stream_id") 

223 

224 next_positions = compute_auto_sort_positions(graphic_objects, ports) 

225 changed_graphics = [] 

226 moved_objects = [] 

227 

228 for graphic_object in graphic_objects: 

229 object_id = graphic_object.simulationObject_id 

230 if object_id is None or object_id not in next_positions: 230 ↛ 231line 230 didn't jump to line 231 because the condition on line 230 was never true

231 continue 

232 

233 old_position = { 

234 "x": float(graphic_object.x or 0), 

235 "y": float(graphic_object.y or 0), 

236 } 

237 new_position = { 

238 "x": next_positions[object_id].x, 

239 "y": next_positions[object_id].y, 

240 } 

241 if old_position == new_position: 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true

242 continue 

243 

244 graphic_object.x = new_position["x"] 

245 graphic_object.y = new_position["y"] 

246 changed_graphics.append(graphic_object) 

247 moved_objects.append( 

248 { 

249 "objectId": object_id, 

250 "graphicObjectId": graphic_object.id, 

251 "oldPosition": old_position, 

252 "newPosition": new_position, 

253 } 

254 ) 

255 

256 with transaction.atomic(): 

257 if changed_graphics: 257 ↛ 260line 257 didn't jump to line 260

258 GraphicObject.objects.bulk_update(changed_graphics, ["x", "y"]) 

259 

260 return moved_objects