Coverage for backend/pgraph_factory/pg_sheet.py: 73%
39 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
2from .types import PgraphDetails, BlockSchema, ConnectionSchema
3from flowsheetInternals.unitops.models import SimulationObject
4from core.auxiliary.models.Flowsheet import Flowsheet
5from core.auxiliary.models.ProcessPath import ProcessPath
6import os
7import requests
10"""
11This class processes p-graph data
12"""
13class PgProcess:
15 def __init__(self, id: int) -> None:
16 self._process_details: PgraphDetails = {
17 "blocks": [], # unit, stream, and decisionNode
18 "connections": []
19 }
20 self.flowsheet = Flowsheet.objects.get(id=id)
22 exclude = {"group"}
23 self.flowsheet_objects = SimulationObject.objects.filter(flowsheet=self.flowsheet).exclude(objectType__in=exclude)
25 for block in self.flowsheet_objects:
26 self.add_block(block)
27 self.add_connections(block)
29 self.solutions: list[list[SimulationObject]] = [] # list of list of block objects
32 def add_connections(self, block: SimulationObject) -> None:
33 """
34 Adds connections for the given block.
36 Parameters:
37 - block (SimulationObject): The block to add connections to.
38 """
39 ports = block.ports.filter(stream__isnull=False)
40 for port in ports:
41 stream = port.stream
42 connection: ConnectionSchema
43 if port.direction == "inlet":
44 connection = [stream.id, block.id]
45 else:
46 connection = [block.id, stream.id]
47 self._process_details["connections"].append(connection)
50 def add_block(self, block: SimulationObject) -> None:
51 """
52 Adds a block in the format of BlockSchema to process details
53 """
54 block_data: BlockSchema = {
55 "id": block.id,
56 "name": block.componentName,
57 "type": block.objectType if block.objectType in ["stream","energy_stream", "decisionNode"] else "unit",
58 }
59 self._process_details["blocks"].append(block_data)
62 def solve(self):
63 """
64 Solves the process graph
65 """
66 url = (os.getenv('PGRAPH_SERVICE_URL') or "http://localhost:8081") + "/solve"
67 result = requests.post(url, json=self._process_details)
68 # check if the request was successful
69 if result.status_code != 200:
70 raise Exception("Error solving the process graph", result.json())
71 content = result.json()
72 self.solutions = [[self.flowsheet_objects.get(id=id) for id in solution] for solution in content]
75 def create_process_paths(self):
76 """
77 Creates the process paths for the solutions
78 """
79 # delete the existing process paths
80 self.flowsheet.ProcessPaths.all().delete()
81 # create process paths for each solution
82 for solution in self.solutions:
83 process_path = ProcessPath.create(flowsheet=self.flowsheet, pathwayObjects=solution)