Coverage for backend/flowsheetInternals/unitops/viewsets/PortViewSet.py: 57%
160 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
1from drf_spectacular.utils import extend_schema, OpenApiParameter, OpenApiTypes
2from core.viewset import ModelViewSet
3from flowsheetInternals.unitops.models.Port import Port
4from flowsheetInternals.unitops.serializers.PortSerializer import PortSerializer
5from rest_framework.response import Response
6from rest_framework import serializers
7import traceback
8from rest_framework.decorators import action
9from django.db import transaction
11from core.auxiliary.models.PropertyInfo import PropertyInfo
12from core.auxiliary.models.PropertySet import PropertySet
14from flowsheetInternals.unitops.models.SimulationObject import SimulationObject
15from flowsheetInternals.unitops.models.simulation_object_factory import SimulationObjectFactory
16from flowsheetInternals.graphicData.models.graphicObjectModel import GraphicObject
17from common.config_types import *
18from ..config.config_methods import *
19from flowsheetInternals.unitops.models.delete_factory import DeleteFactory
21class MergeStreamsSerializer(serializers.Serializer):
22 stream1 = serializers.IntegerField(required=True) # material stream
23 stream2 = serializers.IntegerField(required=True) # material stream
25class SplitStreamSerializer(serializers.Serializer):
26 stream = serializers.IntegerField(required=True)
28class AddStreamSerializer(serializers.Serializer):
29 port = serializers.IntegerField(required=True)
31class CreateDNSerializer(serializers.Serializer):
32 stream = serializers.IntegerField(required=True)
34class ConvertToDNSerializer(serializers.Serializer):
35 stream = serializers.IntegerField(required=True)
37class RestoreConnectionsSerializer(serializers.Serializer):
38 connections = serializers.ListField(
39 child=serializers.DictField(child=serializers.JSONField()),
40 required=True
41 )
43class PortViewSet(ModelViewSet):
44 serializer_class = PortSerializer
46 def get_queryset(self):
47 queryset = Port.objects.all()
48 queryset = queryset.filter(unitOp__is_deleted=False)
49 return queryset
51 @extend_schema(
52 parameters=[
53 OpenApiParameter(name="flowsheet", required=True, type=OpenApiTypes.INT),
54 ]
55 )
56 def list(self, request):
57 return super().list(request)
60 def error_response(self, e):
61 tb_info = traceback.format_exc()
62 error_message = str(e)
63 response_data = {'status': 'error', 'message': error_message, 'traceback': tb_info}
64 return Response(response_data, status=400)
67 @extend_schema(request=MergeStreamsSerializer, responses=None)
68 @action(methods=['post'], detail=False, url_path='merge-streams')
69 def merge_streams(self, request):
70 """
71 Connects two material streams together, managing the connection/disconnection of UnitOperations.
72 Logic: keep the outlet stream and keep the stream 2 position.
73 For Product/Feed -> Intermediary, Keep stream 1&2, create new outlet stream
74 """
75 try:
76 serializer = MergeStreamsSerializer(data=request.data)
77 serializer.is_valid(raise_exception=True)
78 validated_data = serializer.validated_data
80 ids = [validated_data.get('stream1'), validated_data.get('stream2')]
82 # Fetch both streams in a single query with related data
83 streams = SimulationObject.objects.filter(id__in=ids).select_related(
84 "flowsheet"
85 ).prefetch_related(
86 "connectedPorts__unitOp",
87 "graphicObject__group"
88 )
90 stream_map = {stream.id: stream for stream in streams}
91 stream1 = stream_map[ids[0]]
92 stream2 = stream_map[ids[1]]
94 # Wrap the complex merge operation in a transaction to reduce serialization conflicts
95 with transaction.atomic():
96 stream1.merge_stream(stream2)
98 return Response({'status': 'success'}, status=200)
99 except Exception as e:
100 return self.error_response(e)
104 @extend_schema(request=SplitStreamSerializer, responses=None)
105 @action(methods=['post'], detail=False, url_path='split-stream')
106 def split_stream(self, request):
107 """
108 Splits a stream into two separate streams (one inlet and one outlet - disconnected).
109 """
110 try:
111 serializer = SplitStreamSerializer(data=request.data)
112 serializer.is_valid(raise_exception=True)
113 validated_data = serializer.validated_data
115 stream = SimulationObject.objects.get(id=validated_data.get('stream'))
116 stream.split_stream()
117 return Response({'status': 'success'}, status=200)
118 except Exception as e:
119 return self.error_response(e)
123 @extend_schema(request=AddStreamSerializer, responses=None)
124 @action(methods=['post'], detail=False, url_path='add-stream')
125 def add_stream(self, request) -> Response:
126 """
127 Creates a new stream for this port (previously with no stream attached).
128 """
129 try:
130 port = Port.objects.get(id=request.data.get('port'))
131 SimulationObjectFactory.create_stream_at_port(port)
132 return Response({'status': 'success'}, status=200)
134 except Exception as e:
135 return self.error_response(e)
139 @extend_schema(request=ConvertToDNSerializer, responses=None )
140 @action(methods=['post'], detail=False, url_path='convert-to-dn')
141 def convert_to_dn(self, request) -> Response:
142 """
143 Turns an Intermediary stream into a decision node
144 """
145 try:
146 serializer = CreateDNSerializer(data=request.data)
147 serializer.is_valid(raise_exception=True)
148 validated_data = serializer.validated_data
150 # Get stream and outlets
151 id = validated_data.get('stream', None)
152 stream = SimulationObject.objects.get(id=id)
153 stream_port = stream.connectedPorts.first()
155 num_inlets, num_outlets = 1, 1
156 if stream.connectedPorts.count() <= 1: 156 ↛ 163line 156 didn't jump to line 163 because the condition on line 156 was always true
157 if stream_port.direction == ConType.Inlet: 157 ↛ 160line 157 didn't jump to line 160 because the condition on line 157 was always true
158 num_inlets, num_outlets = 0, 1
159 else:
160 num_inlets, num_outlets = 1, 0
162 # Create decision node
163 stream.make_decision_node(num_inlets=num_inlets, num_outlets=num_outlets)
165 return Response({'status': 'success'}, status=200)
166 except Exception as e:
167 return self.error_response(e)
170 def destroy(self, request, *args, **kwargs):
171 """
172 Override the destroy method to allow for deleting attached streams and updating port indexes.
173 """
174 try:
175 port: Port = self.get_object()
176 stream = port.stream
178 port.reindex_port_on_delete()
179 port.unitOp.update_height()
181 if stream and stream.connectedPorts.count() == 0:
182 # If the stream has no connected ports, delete it
183 DeleteFactory.delete_object(stream)
185 return Response({'status': 'success'}, status=204)
186 except Exception as e:
187 return self.error_response(e)
189 @extend_schema(request=RestoreConnectionsSerializer, responses=None)
190 @action(methods=['post'], detail=False, url_path='restore-connections')
191 def restore_connections(self, request):
192 """
193 Enhanced restoration with detailed error tracking and dependency handling.
194 Restores port connections during undo operations without creating recycle tears.
195 This endpoint bypasses the normal merge_stream logic that can create recycle blocks.
196 """
197 try:
198 with transaction.atomic(): # Ensure all-or-nothing operation
199 serializer = RestoreConnectionsSerializer(data=request.data)
200 serializer.is_valid(raise_exception=True)
201 validated_data = serializer.validated_data
203 connections = validated_data.get('connections', [])
205 # Bulk-fetch DB objects to reduce queries
206 port_ids = {c.get('portId') for c in connections if c.get('portId') is not None}
207 stream_ids = {c.get('streamId') for c in connections if c.get('streamId') is not None}
209 # Fetch all ports and streams in single queries
210 port_map = Port.objects.filter(id__in=port_ids).select_related('unitOp__flowsheet').in_bulk()
211 stream_map = (
212 SimulationObject.objects.filter(id__in=stream_ids)
213 .select_related('flowsheet')
214 .in_bulk()
215 )
217 updated_ports = []
218 restored_count = 0
219 failed_connections = []
220 skipped_connections = []
222 for connection in connections:
223 port_id = connection.get('portId')
224 stream_id = connection.get('streamId')
226 if port_id is None:
227 skipped_connections.append({
228 'portId': port_id,
229 'streamId': stream_id,
230 'reason': 'Invalid port ID'
231 })
232 continue
234 port = port_map.get(port_id)
235 if not port:
236 failed_connections.append({
237 'portId': port_id,
238 'streamId': stream_id,
239 'error': f'Port {port_id} not found',
240 'retry_recommended': False # Port truly doesn't exist
241 })
242 continue
244 # Handle disconnection
245 if stream_id is None:
246 port.stream = None
247 updated_ports.append(port)
248 restored_count += 1
249 continue
251 # Handle connection
252 stream = stream_map.get(stream_id)
253 if not stream:
254 failed_connections.append({
255 'portId': port_id,
256 'streamId': stream_id,
257 'error': f'Stream {stream_id} not found',
258 'retry_recommended': True
259 })
260 continue
262 # Ensure same flowsheet (security check)
263 if port.unitOp.flowsheet_id != stream.flowsheet_id:
264 failed_connections.append({
265 'portId': port_id,
266 'streamId': stream_id,
267 'error': 'Port and Stream belong to different flowsheets',
268 'retry_recommended': False
269 })
270 continue
272 port.stream = stream
273 updated_ports.append(port)
274 restored_count += 1
276 # Bulk update all ports in single query
277 if updated_ports:
278 Port.objects.bulk_update(updated_ports, ['stream'])
280 response_data = {
281 'status': 'success' if restored_count else 'failed',
282 'restored_count': restored_count,
283 'total_connections': len(connections),
284 'failed_connections': failed_connections,
285 'skipped_connections': skipped_connections,
286 'retry_candidates': [f for f in failed_connections if f.get('retry_recommended')]
287 }
289 return Response(response_data, status=200)
291 except Exception as e:
292 return self.error_response(e)