Coverage for backend/django/core/auxiliary/viewsets/TaskViewSet.py: 97%

54 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-06-23 21:51 +0000

1from django.views.decorators.csrf import csrf_exempt 

2from idaes_factory.endpoints import process_cancel_solve_response 

3from drf_spectacular.types import OpenApiTypes 

4from drf_spectacular.utils import extend_schema, OpenApiParameter 

5from rest_framework import status 

6from rest_framework.decorators import action, authentication_classes 

7from rest_framework.permissions import IsAuthenticated 

8from rest_framework.response import Response 

9 

10from authentication.custom_drf_authentication import DaprApiTokenAuthentication 

11from common.models.general import TaskEvent 

12from core.auxiliary.managers import TaskManager 

13from core.pagination import ViewSetPagination 

14from core.auxiliary.models.Task import Task, TaskType 

15from core.auxiliary.serializers import TaskSerializer 

16from core.validation import api_view_ignore_access_control 

17from core.viewset import ReadOnlyModelViewSet 

18 

19 

20class TaskViewSet(ReadOnlyModelViewSet): 

21 serializer_class = TaskSerializer 

22 pagination_class = ViewSetPagination 

23 

24 # Filter to only show tasks for a specific flowsheet 

25 def get_queryset(self): 

26 queryset = Task.objects.all().select_related("metadata") 

27 

28 if self.action == 'list': 

29 non_child_tasks = queryset.filter(parent__isnull=True).exclude( 

30 task_type=TaskType.BUILD_STATE, 

31 ) 

32 return non_child_tasks 

33 

34 return queryset 

35 

36 @extend_schema( 

37 parameters=[ 

38 OpenApiParameter(name="flowsheet", required=True, type=OpenApiTypes.INT), 

39 ] 

40 ) 

41 def list(self, request): 

42 return super().list(request) 

43 

44 @extend_schema(responses=TaskSerializer(many=True)) 

45 @action(detail=True, methods=['get'], url_path='children') 

46 def get_task_children(self, request, pk): 

47 children = Task.objects.filter(parent_id=pk) 

48 

49 page = self.paginate_queryset(children) 

50 if page is not None: 50 ↛ 54line 50 didn't jump to line 54 because the condition on line 50 was always true

51 serializer = self.get_serializer(page, many=True) 

52 return self.get_paginated_response(serializer.data) 

53 

54 return Response(status=status.HTTP_400_BAD_REQUEST) 

55 

56 @action( 

57 detail=False, 

58 methods=['post'], 

59 url_path='running', 

60 authentication_classes=[DaprApiTokenAuthentication], 

61 permission_classes=[IsAuthenticated], 

62 ) 

63 @extend_schema(exclude=True) 

64 @csrf_exempt 

65 @api_view_ignore_access_control 

66 def handle_task_running_event(self, request): 

67 """Accept trusted Dapr task-running events from backend workers.""" 

68 task_running_event = TaskEvent.model_validate(request.data) 

69 print(f"Received task-running event for task ID: {task_running_event.data.task_id} time: {task_running_event.time}") 

70 TaskManager.handle_task_running_event(task_running_event.data.task_id) 

71 

72 return Response(status=status.HTTP_200_OK) 

73 

74 @action( 

75 detail=False, 

76 methods=['post'], 

77 url_path='cancelled', 

78 authentication_classes=[DaprApiTokenAuthentication], 

79 permission_classes=[IsAuthenticated], 

80 ) 

81 @extend_schema(exclude=True) 

82 @csrf_exempt 

83 @api_view_ignore_access_control 

84 def handle_task_cancelled_event(self, request): 

85 """Accept trusted Dapr task-cancelled events from the IDAES service.""" 

86 task_cancelled_event = TaskEvent.model_validate(request.data) 

87 process_cancel_solve_response(task_cancelled_event.data) 

88 

89 return Response(status=status.HTTP_200_OK)