Coverage for backend/pgraph_factory/pg_sheet.py: 73%

39 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-11-06 23:27 +0000

1 

2from .types import PgraphDetails, BlockSchema, ConnectionSchema 

3from flowsheetInternals.unitops.models import SimulationObject 

4from core.auxiliary.models.Flowsheet import Flowsheet 

5from core.auxiliary.models.ProcessPath import ProcessPath 

6import os 

7import requests 

8 

9 

10""" 

11This class processes p-graph data  

12""" 

13class PgProcess: 

14 

15 def __init__(self, id: int) -> None: 

16 self._process_details: PgraphDetails = { 

17 "blocks": [], # unit, stream, and decisionNode 

18 "connections": [] 

19 } 

20 self.flowsheet = Flowsheet.objects.get(id=id) 

21 

22 exclude = {"group"} 

23 self.flowsheet_objects = SimulationObject.objects.filter(flowsheet=self.flowsheet).exclude(objectType__in=exclude) 

24 

25 for block in self.flowsheet_objects: 

26 self.add_block(block) 

27 self.add_connections(block) 

28 

29 self.solutions: list[list[SimulationObject]] = [] # list of list of block objects 

30 

31 

32 def add_connections(self, block: SimulationObject) -> None: 

33 """ 

34 Adds connections for the given block. 

35 

36 Parameters: 

37 - block (SimulationObject): The block to add connections to. 

38 """ 

39 ports = block.ports.filter(stream__isnull=False) 

40 for port in ports: 

41 stream = port.stream 

42 connection: ConnectionSchema 

43 if port.direction == "inlet": 

44 connection = [stream.id, block.id] 

45 else: 

46 connection = [block.id, stream.id] 

47 self._process_details["connections"].append(connection) 

48 

49 

50 def add_block(self, block: SimulationObject) -> None: 

51 """ 

52 Adds a block in the format of BlockSchema to process details 

53 """ 

54 block_data: BlockSchema = { 

55 "id": block.id, 

56 "name": block.componentName, 

57 "type": block.objectType if block.objectType in ["stream","energy_stream", "decisionNode"] else "unit", 

58 } 

59 self._process_details["blocks"].append(block_data) 

60 

61 

62 def solve(self): 

63 """ 

64 Solves the process graph 

65 """ 

66 url = (os.getenv('PGRAPH_SERVICE_URL') or "http://localhost:8081") + "/solve" 

67 result = requests.post(url, json=self._process_details) 

68 # check if the request was successful 

69 if result.status_code != 200: 

70 raise Exception("Error solving the process graph", result.json()) 

71 content = result.json() 

72 self.solutions = [[self.flowsheet_objects.get(id=id) for id in solution] for solution in content] 

73 

74 

75 def create_process_paths(self): 

76 """ 

77 Creates the process paths for the solutions 

78 """ 

79 # delete the existing process paths 

80 self.flowsheet.ProcessPaths.all().delete() 

81 # create process paths for each solution 

82 for solution in self.solutions: 

83 process_path = ProcessPath.create(flowsheet=self.flowsheet, pathwayObjects=solution) 

84