Coverage for backend/django/flowsheetInternals/graphicData/services/auto_sort.py: 89%
116 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
1from dataclasses import dataclass
2import shlex
3from typing import Iterable
4from django.db import transaction
5from django.db.models import Q
6import pydot
8from core.auxiliary.enums import ConType
9from flowsheetInternals.graphicData.models.graphicObjectModel import GraphicObject
10from flowsheetInternals.unitops.models.Port import Port
13GRAPHVIZ_SCALE = 40
14RECYCLE_VERTICAL_OFFSET = 80
15RECYCLE_STACK_OFFSET = 40
18@dataclass(frozen=True)
19class Position:
20 x: float
21 y: float
24def _graphic_sort_key(graphic_object: GraphicObject) -> tuple[float, float, int]:
25 return (
26 float(graphic_object.y or 0),
27 float(graphic_object.x or 0),
28 graphic_object.simulationObject_id or 0,
29 )
32def _get_recycle_tear_object_id(graphic_object: GraphicObject) -> int | None:
33 simulation_object = getattr(graphic_object, "simulationObject", None)
34 if getattr(simulation_object, "objectType", None) != "recycle":
35 return None
37 try:
38 recycle_data = simulation_object.recycleData
39 except AttributeError:
40 return None
42 return getattr(recycle_data, "tearObject_id", None)
45def _split_recycles(
46 graphic_objects: list[GraphicObject],
47) -> tuple[list[GraphicObject], dict[int, list[int]]]:
48 """
49 Connected recycle blocks are annotations on a tear stream, so place them
50 relative to that stream instead of letting them influence the graph layout.
51 """
52 visible_object_ids = {graphic.simulationObject_id for graphic in graphic_objects}
53 graphviz_graphics = []
54 recycle_ids_by_tear_object_id = {}
56 for graphic in graphic_objects:
57 tear_object_id = _get_recycle_tear_object_id(graphic)
58 if tear_object_id is None or tear_object_id not in visible_object_ids:
59 graphviz_graphics.append(graphic)
60 continue
62 recycle_ids_by_tear_object_id.setdefault(tear_object_id, []).append(
63 graphic.simulationObject_id
64 )
66 return graphviz_graphics, recycle_ids_by_tear_object_id
69def _iter_connection_edges(
70 ports: Iterable[Port], visible_object_ids: set[int]
71) -> Iterable[tuple[int, int]]:
72 for port in ports:
73 if (
74 port.unitOp_id not in visible_object_ids
75 or port.stream_id not in visible_object_ids
76 ):
77 continue
79 if port.direction == ConType.Outlet:
80 yield port.unitOp_id, port.stream_id
81 elif port.direction == ConType.Inlet: 81 ↛ 72line 81 didn't jump to line 72 because the condition on line 81 was always true
82 yield port.stream_id, port.unitOp_id
85def _build_graph(
86 graphic_objects: list[GraphicObject],
87 ports: Iterable[Port],
88) -> pydot.Dot:
89 visible_object_ids = {graphic.simulationObject_id for graphic in graphic_objects}
90 graph = pydot.Dot(
91 graph_type="digraph",
92 rankdir="LR",
93 ranksep="4.0 equally",
94 nodesep="2.0",
95 )
96 graph.set_node_defaults(
97 shape="box",
98 fixedsize="true",
99 width="1.5",
100 height="1.0",
101 )
102 graph.set_edge_defaults(weight="2")
104 for sort_index, graphic in enumerate(graphic_objects):
105 object_id = str(graphic.simulationObject_id)
106 graph.add_node(
107 pydot.Node(
108 object_id,
109 label=object_id,
110 sortv=str(sort_index),
111 )
112 )
114 for source, target in _iter_connection_edges(ports, visible_object_ids):
115 graph.add_edge(pydot.Edge(str(source), str(target)))
117 return graph
120def _parse_plain_positions(plain_output: str) -> dict[int, Position]:
121 positions = {}
122 for line in plain_output.splitlines():
123 parts = shlex.split(line)
124 if not parts or parts[0] != "node":
125 continue
127 object_id = int(parts[1])
128 positions[object_id] = Position(
129 x=float(parts[2]) * GRAPHVIZ_SCALE,
130 y=float(parts[3]) * GRAPHVIZ_SCALE,
131 )
133 return positions
136def _layout_graph(graph: pydot.Dot) -> dict[int, Position]:
137 plain_output = graph.create(format="plain", prog="dot").decode("utf-8")
138 return _parse_plain_positions(plain_output)
141def compute_auto_sort_positions(
142 graphic_objects: Iterable[GraphicObject],
143 ports: Iterable[Port],
144) -> dict[int, Position]:
145 sorted_graphics = sorted(
146 [
147 graphic
148 for graphic in graphic_objects
149 if graphic.simulationObject_id is not None
150 ],
151 key=_graphic_sort_key,
152 )
153 if not sorted_graphics: 153 ↛ 154line 153 didn't jump to line 154 because the condition on line 153 was never true
154 return {}
156 graphviz_graphics, recycle_ids_by_tear_object_id = _split_recycles(sorted_graphics)
157 if not graphviz_graphics: 157 ↛ 158line 157 didn't jump to line 158 because the condition on line 157 was never true
158 return {}
160 graphviz_positions = _layout_graph(_build_graph(graphviz_graphics, ports))
161 if not graphviz_positions: 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true
162 return {}
164 min_graphviz_x = min(position.x for position in graphviz_positions.values())
165 max_graphviz_y = max(position.y for position in graphviz_positions.values())
166 anchor_x = min(float(graphic.x or 0) for graphic in sorted_graphics)
167 anchor_y = min(float(graphic.y or 0) for graphic in sorted_graphics)
169 positions = {
170 object_id: Position(
171 x=round(anchor_x + position.x - min_graphviz_x, 2),
172 y=round(anchor_y + max_graphviz_y - position.y, 2),
173 )
174 for object_id, position in graphviz_positions.items()
175 }
177 for tear_object_id, recycle_object_ids in recycle_ids_by_tear_object_id.items():
178 tear_position = positions.get(tear_object_id)
179 if tear_position is None: 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was never true
180 continue
182 for stack_index, recycle_object_id in enumerate(sorted(recycle_object_ids)):
183 positions[recycle_object_id] = Position(
184 x=tear_position.x,
185 y=tear_position.y - RECYCLE_VERTICAL_OFFSET - stack_index * RECYCLE_STACK_OFFSET,
186 )
188 return positions
191def auto_sort(flowsheet_id: int, group_id: int):
192 graphic_objects = list(
193 GraphicObject.objects.filter(
194 group_id=group_id,
195 visible=True,
196 simulationObject__is_deleted=False,
197 )
198 .select_related("simulationObject", "simulationObject__recycleData")
199 .only(
200 "id",
201 "x",
202 "y",
203 "group_id",
204 "simulationObject_id",
205 "simulationObject__id",
206 "simulationObject__objectType",
207 "simulationObject__recycleData__id",
208 "simulationObject__recycleData__tearObject_id",
209 )
210 )
211 object_ids = {
212 graphic_object.simulationObject_id
213 for graphic_object in graphic_objects
214 if graphic_object.simulationObject_id is not None
215 }
217 ports = Port.objects.none()
218 if object_ids: 218 ↛ 224line 218 didn't jump to line 224 because the condition on line 218 was always true
219 ports = Port.objects.filter(
220 Q(unitOp_id__in=object_ids) | Q(stream_id__in=object_ids),
221 flowsheet_id=flowsheet_id,
222 ).only("id", "direction", "unitOp_id", "stream_id")
224 next_positions = compute_auto_sort_positions(graphic_objects, ports)
225 changed_graphics = []
226 moved_objects = []
228 for graphic_object in graphic_objects:
229 object_id = graphic_object.simulationObject_id
230 if object_id is None or object_id not in next_positions: 230 ↛ 231line 230 didn't jump to line 231 because the condition on line 230 was never true
231 continue
233 old_position = {
234 "x": float(graphic_object.x or 0),
235 "y": float(graphic_object.y or 0),
236 }
237 new_position = {
238 "x": next_positions[object_id].x,
239 "y": next_positions[object_id].y,
240 }
241 if old_position == new_position: 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true
242 continue
244 graphic_object.x = new_position["x"]
245 graphic_object.y = new_position["y"]
246 changed_graphics.append(graphic_object)
247 moved_objects.append(
248 {
249 "objectId": object_id,
250 "graphicObjectId": graphic_object.id,
251 "oldPosition": old_position,
252 "newPosition": new_position,
253 }
254 )
256 with transaction.atomic():
257 if changed_graphics: 257 ↛ 260line 257 didn't jump to line 260
258 GraphicObject.objects.bulk_update(changed_graphics, ["x", "y"])
260 return moved_objects