Coverage for backend/django/core/auxiliary/viewsets/TaskViewSet.py: 93%

53 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-05-13 02:47 +0000

1from django.views.decorators.csrf import csrf_exempt 

2from idaes_factory.endpoints import process_cancel_solve_response 

3from drf_spectacular.types import OpenApiTypes 

4from drf_spectacular.utils import extend_schema, OpenApiParameter 

5from rest_framework import status 

6from rest_framework.decorators import action, authentication_classes 

7from rest_framework.permissions import IsAuthenticated 

8from rest_framework.response import Response 

9 

10from authentication.custom_drf_authentication import DaprApiTokenAuthentication 

11from common.models.general import TaskEvent 

12from core.auxiliary.managers import TaskManager 

13from core.pagination import ViewSetPagination 

14from core.auxiliary.models.Task import Task, TaskType 

15from core.auxiliary.serializers import TaskSerializer 

16from core.validation import api_view_ignore_access_control 

17from core.viewset import ReadOnlyModelViewSet 

18 

19 

20class TaskViewSet(ReadOnlyModelViewSet): 

21 serializer_class = TaskSerializer 

22 pagination_class = ViewSetPagination 

23 

24 # Filter to only show tasks for a specific flowsheet 

25 def get_queryset(self): 

26 queryset = Task.objects.all().select_related("metadata") 

27 

28 if self.action == 'list': 28 ↛ 34line 28 didn't jump to line 34 because the condition on line 28 was always true

29 non_child_tasks = queryset.filter(parent__isnull=True).exclude( 

30 task_type=TaskType.BUILD_STATE, 

31 ) 

32 return non_child_tasks 

33 

34 return queryset 

35 

36 @extend_schema( 

37 parameters=[ 

38 OpenApiParameter(name="flowsheet", required=True, type=OpenApiTypes.INT), 

39 ] 

40 ) 

41 def list(self, request): 

42 return super().list(request) 

43 

44 @extend_schema(responses=TaskSerializer(many=True)) 

45 @action(detail=True, methods=['get'], url_path='children') 

46 def get_task_children(self, request, pk): 

47 children = Task.objects.filter(parent_id=pk) 

48 

49 page = self.paginate_queryset(children) 

50 if page is not None: 50 ↛ 54line 50 didn't jump to line 54 because the condition on line 50 was always true

51 serializer = self.get_serializer(page, many=True) 

52 return self.get_paginated_response(serializer.data) 

53 

54 return Response(status=status.HTTP_400_BAD_REQUEST) 

55 

56 @action( 

57 detail=False, 

58 methods=['post'], 

59 url_path='running', 

60 authentication_classes=[DaprApiTokenAuthentication], 

61 permission_classes=[IsAuthenticated], 

62 ) 

63 @extend_schema(exclude=True) 

64 @csrf_exempt 

65 @api_view_ignore_access_control 

66 def handle_task_running_event(self, request): 

67 """Accept trusted Dapr task-running events from backend workers.""" 

68 task_running_event = TaskEvent.model_validate(request.data) 

69 TaskManager.handle_task_running_event(task_running_event.data.task_id) 

70 

71 return Response(status=status.HTTP_200_OK) 

72 

73 @action( 

74 detail=False, 

75 methods=['post'], 

76 url_path='cancelled', 

77 authentication_classes=[DaprApiTokenAuthentication], 

78 permission_classes=[IsAuthenticated], 

79 ) 

80 @extend_schema(exclude=True) 

81 @csrf_exempt 

82 @api_view_ignore_access_control 

83 def handle_task_cancelled_event(self, request): 

84 """Accept trusted Dapr task-cancelled events from the IDAES service.""" 

85 task_cancelled_event = TaskEvent.model_validate(request.data) 

86 process_cancel_solve_response(task_cancelled_event.data) 

87 

88 return Response(status=status.HTTP_200_OK)