Coverage for backend/idaes_service/application.py: 93%

71 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-11-06 23:27 +0000

1import logging 

2import traceback 

3 

4from fastapi.exceptions import RequestValidationError 

5from opentelemetry import trace 

6from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter 

7from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor 

8from opentelemetry.instrumentation.logging import LoggingInstrumentor 

9from opentelemetry.sdk.resources import Resource, SERVICE_NAME 

10from opentelemetry.sdk.trace import TracerProvider 

11from opentelemetry.sdk.trace.export import BatchSpanProcessor 

12from starlette.exceptions import HTTPException 

13from starlette.requests import Request 

14from starlette.responses import JSONResponse 

15from starlette.status import HTTP_500_INTERNAL_SERVER_ERROR 

16 

17from idaes_service import settings 

18from fastapi import FastAPI, Response 

19from fastapi.exception_handlers import ( 

20 http_exception_handler, 

21 request_validation_exception_handler 

22) 

23from common.models.idaes import FlowsheetSchema 

24from common.models.idaes.payloads import BuildStateRequestSchema 

25from common.models.idaes.payloads.solve_request_schema import IdaesSolveEvent 

26from common.services import messaging 

27from common.models.idaes.payloads.ml_request_schema import MLTrainingEvent 

28from idaes_service.endpoints import solve_endpoint, build_state_endpoint, generate_python_code_endpoint, ml_endpoint 

29 

30settings.load() 

31 

32app = FastAPI() 

33 

34def log_exception(request: Request, exc: Exception): 

35 logging.error(f"Error while handling {request.url}: {exc}", exc_info=True) 

36 

37@app.exception_handler(HTTPException) 

38async def handle_http_exception(request: Request, exc): 

39 log_exception(request, exc) 

40 

41 return await http_exception_handler(request, exc) 

42 

43@app.exception_handler(RequestValidationError) 

44async def handle_request_validation_exception(request, exc): 

45 log_exception(request, exc) 

46 

47 return await request_validation_exception_handler(request, exc) 

48 

49@app.exception_handler(Exception) 

50async def handle_unhandled_exception(request: Request, exc: Exception): 

51 log_exception(request, exc) 

52 

53 # Return a generic 500 Internal Server Error response 

54 return JSONResponse( 

55 {"detail": traceback.format_exc()}, status_code=HTTP_500_INTERNAL_SERVER_ERROR 

56 ) 

57 

58@app.get("/health") 

59def health_check(): 

60 return {"status": "healthy"} 

61 

62@app.post("/solve_model") 

63def solve_model(solve_request: IdaesSolveEvent, response: Response): 

64 """ 

65 Endpoint handler for solving a flowsheet model. This endpoint is typically invoked by Dapr 

66 as part of an asynchronous solving workflow. The result is likewise communicated back to Django 

67 asynchronously via our messaging system. 

68 """ 

69 

70 # Report that this task is now being handled 

71 messaging.send_task_running_message(solve_request.data.task_id) 

72 

73 result = solve_endpoint(solve_request.data, response) 

74 

75 messaging.send_idaes_solve_completion_message(result) 

76 

77 # Regardless of the result, we always return an HTTP 200 status code to Dapr to indicate the request was processed 

78 # Solve failure handling is handled by the consumer of the completion message 

79 response.status_code = 200 

80 return response # Not sure why we return response here, that isn't standard practice. But dapr discards it anyway afaik 

81 

82@app.post("/build_state") 

83def build_state(build_state_request: BuildStateRequestSchema, response: Response): 

84 return build_state_endpoint(build_state_request, response) 

85 

86@app.post("/generate_python_code") 

87def generate_python_code(generate_python_code_request: FlowsheetSchema, response: Response): 

88 return generate_python_code_endpoint(generate_python_code_request, response) 

89 

90@app.post("/ml_train") 

91def ml_train(ml_request: MLTrainingEvent, response: Response): 

92 """ 

93 Endpoint handler for training a machine learning model. This endpoint is invoked directly by 

94 a Dapr sidecar as part of an asynchronous ML training workflow. The result is communicated 

95 back to Django asynchronously via our messaging system. 

96 """ 

97 

98 # Report that this task is now being handled 

99 messaging.send_task_running_message(ml_request.data.task_id) 

100 

101 result = ml_endpoint(ml_request.data, response) 

102 

103 messaging.send_ml_training_completion_message(result) 

104 

105 # Regardless of the result, we always return an HTTP 200 status code to dapr 

106 # Solve failure handling is handled by the consumer of the completion message 

107 response.status_code = 200 

108 

109 return response 

110 

111def initialise_tracing(): 

112 """ 

113 Initialize OpenTelemetry tracing with OTLP exporter and FastAPI instrumentation. 

114 """ 

115 

116 resource = Resource.create(attributes={ 

117 SERVICE_NAME: settings.OTLP_SERVICE_NAME 

118 }) 

119 

120 tracerProvider = TracerProvider(resource=resource) 

121 processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=settings.OTLP_ENDPOINT)) 

122 tracerProvider.add_span_processor(processor) 

123 trace.set_tracer_provider(tracerProvider) 

124 

125 FastAPIInstrumentor.instrument_app(app) 

126 LoggingInstrumentor().instrument(set_logging_format=True) 

127 

128initialise_tracing()