Coverage for backend/ahuora-builder/src/ahuora_builder/ml_wizard.py: 98%
54 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
1import contextlib
2import json
3from dataclasses import dataclass
4from io import StringIO
5from typing import Any
7import pandas as pd
8import numpy as np
9from sklearn.model_selection import train_test_split
10from sklearn.metrics import mean_squared_error, r2_score
11from idaes.core.surrogate.pysmo_surrogate import PysmoRBFTrainer, PysmoSurrogate
13from ahuora_builder_types.payloads.ml_request_schema import (
14 MLTrainingChartPayload as MLChart,
15 MLTrainingRegressionMetricPayload as RegressionMetric,
16 MLTrainingResultPayload as MLResult,
17)
20MAX_QQ_CHART_POINTS = 250
23@dataclass
24class TrainingOutput:
25 """Intermediate ML training artefacts before object-storage upload."""
27 result_payload: MLResult
28 test_inputs_df: pd.DataFrame
29 test_outputs_df: pd.DataFrame
32def _json_indexed_records(frame: pd.DataFrame) -> dict[str, dict[str, float]]:
33 """Convert a dataframe to a JSON-safe indexed mapping with string row keys."""
34 return {
35 str(index): {
36 str(column): float(value)
37 for column, value in row.items()
38 }
39 for index, row in frame.to_dict(orient="index").items()
40 }
43def train_dataframe(
44 df: pd.DataFrame,
45 *,
46 input_labels: list[str],
47 output_labels: list[str],
48) -> TrainingOutput:
49 """Train a PySMO surrogate model from a dataframe and return upload-ready artefacts."""
50 train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)
52 trainer = PysmoRBFTrainer(
53 input_labels=input_labels, output_labels=output_labels, training_dataframe=train_df)
54 trainer.config.basis_function = 'gaussian'
56 # Train surrogate (calls PySMO through IDAES Python wrapper)
57 stream = StringIO()
58 with contextlib.redirect_stdout(stream):
59 rbf_train = trainer.train_surrogate()
62 # create callable surrogate model
63 rbf_surr = PysmoSurrogate(rbf_train, input_labels,
64 output_labels, input_bounds=None)
65 f = StringIO()
66 rbf_surr.save(f)
67 content = f.getvalue()
68 json_data = json.loads(content)
70 df_evaluate = rbf_surr.evaluate_surrogate(test_df)
72 metrics: list[RegressionMetric] = []
73 charts: list[MLChart] = []
75 for output_label in output_labels:
76 charts.append(compute_chart(
77 test_df[output_label], df_evaluate[output_label], output_label))
78 metrics.append(
79 RegressionMetric(
80 mean_squared_error=round(
81 mean_squared_error(df_evaluate[output_label], test_df[output_label]),
82 4,
83 ),
84 r2_score=round(
85 r2_score(df_evaluate[output_label], test_df[output_label]),
86 4,
87 ),
88 )
89 )
91 return TrainingOutput(
92 result_payload=MLResult(
93 surrogate_model=json_data,
94 charts=charts,
95 metrics=metrics,
96 test_results_bucket="",
97 test_results_key="",
98 timing={},
99 ),
100 test_inputs_df=test_df[input_labels].copy(),
101 test_outputs_df=df_evaluate[output_labels].copy(),
102 )
105def compute_chart(
106 test_data_df: pd.Series,
107 eval_data_df: pd.Series,
108 output_label: str,
109) -> MLChart:
110 """Build the chart payload for one output label."""
111 minn = round(np.min([np.min(test_data_df), np.min(eval_data_df)]), 4)
112 maxx = round(np.max([np.max(test_data_df), np.max(eval_data_df)]), 4)
113 qq_plot_data = compute_qq_coordinates(test_data_df, eval_data_df)
115 return MLChart(
116 min=minn,
117 max=maxx,
118 qq_plot_data=qq_plot_data,
119 output_label=output_label,
120 )
123def compute_qq_coordinates(test_data: pd.Series, eval_data: pd.Series) -> str:
124 """Compute QQ plot coordinates for two datasets as a frontend-ready JSON string."""
125 test_values = test_data.to_numpy().flatten()
126 eval_values = eval_data.to_numpy().flatten()
128 # Sort values
129 test_values.sort()
130 eval_values.sort()
132 # Generate QQ plot data points
133 n_points = min(len(test_values), MAX_QQ_CHART_POINTS)
134 quantiles = np.linspace(0, 1, n_points)
135 test_quantiles = np.quantile(test_values, quantiles)
136 eval_quantiles = np.quantile(eval_values, quantiles)
138 # Prepare JSON response for frontend
139 qq_data = [{"x": round(float(t), 4), "y": round(float(e), 4)}
140 for t, e in zip(test_quantiles, eval_quantiles)]
141 return json.dumps(qq_data)