Coverage for backend/django/core/auxiliary/viewsets/TaskViewSet.py: 93%
53 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
1from django.views.decorators.csrf import csrf_exempt
2from idaes_factory.endpoints import process_cancel_solve_response
3from drf_spectacular.types import OpenApiTypes
4from drf_spectacular.utils import extend_schema, OpenApiParameter
5from rest_framework import status
6from rest_framework.decorators import action, authentication_classes
7from rest_framework.permissions import IsAuthenticated
8from rest_framework.response import Response
10from authentication.custom_drf_authentication import DaprApiTokenAuthentication
11from common.models.general import TaskEvent
12from core.auxiliary.managers import TaskManager
13from core.pagination import ViewSetPagination
14from core.auxiliary.models.Task import Task, TaskType
15from core.auxiliary.serializers import TaskSerializer
16from core.validation import api_view_ignore_access_control
17from core.viewset import ReadOnlyModelViewSet
20class TaskViewSet(ReadOnlyModelViewSet):
21 serializer_class = TaskSerializer
22 pagination_class = ViewSetPagination
24 # Filter to only show tasks for a specific flowsheet
25 def get_queryset(self):
26 queryset = Task.objects.all().select_related("metadata")
28 if self.action == 'list': 28 ↛ 34line 28 didn't jump to line 34 because the condition on line 28 was always true
29 non_child_tasks = queryset.filter(parent__isnull=True).exclude(
30 task_type=TaskType.BUILD_STATE,
31 )
32 return non_child_tasks
34 return queryset
36 @extend_schema(
37 parameters=[
38 OpenApiParameter(name="flowsheet", required=True, type=OpenApiTypes.INT),
39 ]
40 )
41 def list(self, request):
42 return super().list(request)
44 @extend_schema(responses=TaskSerializer(many=True))
45 @action(detail=True, methods=['get'], url_path='children')
46 def get_task_children(self, request, pk):
47 children = Task.objects.filter(parent_id=pk)
49 page = self.paginate_queryset(children)
50 if page is not None: 50 ↛ 54line 50 didn't jump to line 54 because the condition on line 50 was always true
51 serializer = self.get_serializer(page, many=True)
52 return self.get_paginated_response(serializer.data)
54 return Response(status=status.HTTP_400_BAD_REQUEST)
56 @action(
57 detail=False,
58 methods=['post'],
59 url_path='running',
60 authentication_classes=[DaprApiTokenAuthentication],
61 permission_classes=[IsAuthenticated],
62 )
63 @extend_schema(exclude=True)
64 @csrf_exempt
65 @api_view_ignore_access_control
66 def handle_task_running_event(self, request):
67 """Accept trusted Dapr task-running events from backend workers."""
68 task_running_event = TaskEvent.model_validate(request.data)
69 TaskManager.handle_task_running_event(task_running_event.data.task_id)
71 return Response(status=status.HTTP_200_OK)
73 @action(
74 detail=False,
75 methods=['post'],
76 url_path='cancelled',
77 authentication_classes=[DaprApiTokenAuthentication],
78 permission_classes=[IsAuthenticated],
79 )
80 @extend_schema(exclude=True)
81 @csrf_exempt
82 @api_view_ignore_access_control
83 def handle_task_cancelled_event(self, request):
84 """Accept trusted Dapr task-cancelled events from the IDAES service."""
85 task_cancelled_event = TaskEvent.model_validate(request.data)
86 process_cancel_solve_response(task_cancelled_event.data)
88 return Response(status=status.HTTP_200_OK)