Coverage for backend/common/src/common/services/csv.py: 84%
91 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
1import csv
2import io
3from dataclasses import dataclass
6SUPPORTED_DELIMITERS = (",", ";")
7UNSUPPORTED_DELIMITER_HINTS = ("|", "\t")
8DEFAULT_SINGLE_COLUMN_DELIMITER = ","
9DEFAULT_PREVIEW_ROWS = 10
10DEFAULT_INSPECTION_BYTES = 1024 * 1024
13class CsvInspectionError(ValueError):
14 pass
17@dataclass(slots=True)
18class CsvInspectionResult:
19 headers: list[str]
20 delimiter: str
21 preview_rows: list[dict[str, str]]
22 warnings: list[str]
25def decode_csv_bytes(sample_bytes: bytes) -> str:
26 try:
27 return sample_bytes.decode("utf-8-sig")
28 except UnicodeDecodeError as exc:
29 raise CsvInspectionError("CSV files must be UTF-8 encoded text.") from exc
32def detect_delimiter(sample_text: str) -> str:
33 """
34 Detect a supported CSV delimiter.
36 Single-column CSV files are valid and naturally contain no delimiter
37 characters, so we default those to comma after ruling out obvious
38 unsupported delimiters (for example tab or pipe).
39 """
40 lines = [line for line in sample_text.splitlines() if line.strip()]
41 if not lines: 41 ↛ 42line 41 didn't jump to line 42 because the condition on line 41 was never true
42 raise CsvInspectionError("The uploaded CSV is empty.")
44 sample = "\n".join(lines[:10])
45 try:
46 dialect = csv.Sniffer().sniff(sample, delimiters="".join(SUPPORTED_DELIMITERS))
47 delimiter = dialect.delimiter
48 except csv.Error:
49 header_line = lines[0]
50 counts = {delimiter: header_line.count(delimiter) for delimiter in SUPPORTED_DELIMITERS}
51 if any(counts.values()): 51 ↛ 52line 51 didn't jump to line 52 because the condition on line 51 was never true
52 delimiter = max(counts, key=counts.get)
53 else:
54 candidate_lines = lines[:2]
55 has_consistent_unsupported_delimiter = any(
56 candidate_lines and all(line.count(hint) > 0 for line in candidate_lines)
57 for hint in UNSUPPORTED_DELIMITER_HINTS
58 )
59 if has_consistent_unsupported_delimiter:
60 raise CsvInspectionError("CSV delimiter must be either ',' or ';'.")
61 delimiter = DEFAULT_SINGLE_COLUMN_DELIMITER
63 if delimiter not in SUPPORTED_DELIMITERS: 63 ↛ 64line 63 didn't jump to line 64 because the condition on line 63 was never true
64 raise CsvInspectionError("CSV delimiter must be either ',' or ';'.")
66 return delimiter
69def inspect_csv_sample(sample_bytes: bytes, preview_rows: int = DEFAULT_PREVIEW_ROWS) -> CsvInspectionResult:
70 sample_text = decode_csv_bytes(sample_bytes)
71 delimiter = detect_delimiter(sample_text)
72 reader = csv.DictReader(io.StringIO(sample_text), delimiter=delimiter)
73 headers = [header for header in (reader.fieldnames or []) if header]
74 if not headers: 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true
75 raise CsvInspectionError("The uploaded CSV must include a header row.")
77 preview: list[dict[str, str]] = []
78 for row in reader:
79 preview.append({header: row.get(header, "") for header in headers})
80 if len(preview) >= preview_rows:
81 break
83 return CsvInspectionResult(
84 headers=headers,
85 delimiter=delimiter,
86 preview_rows=preview,
87 warnings=[],
88 )
91def stream_csv_rows(binary_stream, delimiter: str):
92 if delimiter not in SUPPORTED_DELIMITERS: 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true
93 raise CsvInspectionError("CSV delimiter must be either ',' or ';'.")
95 text_stream = io.TextIOWrapper(binary_stream, encoding="utf-8-sig", newline="")
96 reader = csv.DictReader(text_stream, delimiter=delimiter)
97 headers = [header for header in (reader.fieldnames or []) if header]
98 if not headers: 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true
99 raise CsvInspectionError("The uploaded CSV must include a header row.")
101 return headers, reader
104def parse_float_cell(
105 row_number: int,
106 row: dict[str, str | None],
107 column_name: str,
108 *,
109 required: bool,
110) -> float | None:
111 raw_value = (row.get(column_name) or "").strip()
112 if not raw_value:
113 if required: 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true
114 raise CsvInspectionError(f"Row {row_number}, column '{column_name}' is empty.")
115 return None
117 try:
118 return float(raw_value)
119 except ValueError as exc:
120 raise CsvInspectionError(
121 f"Row {row_number}, column '{column_name}' must contain a numeric value."
122 ) from exc
125def parse_numeric_row(row_number: int, row: dict[str, str | None], headers: list[str]) -> list[float]:
126 return [
127 parse_float_cell(row_number, row, header, required=True)
128 for header in headers
129 ]
132def filter_numeric_headers_by_first_row(
133 headers: list[str],
134 row: dict[str, str | None],
135) -> tuple[list[str], list[str]]:
136 numeric_headers: list[str] = []
137 skipped_headers: list[str] = []
139 for header in headers:
140 raw_value = (row.get(header) or "").strip()
141 if not raw_value: 141 ↛ 142line 141 didn't jump to line 142 because the condition on line 141 was never true
142 skipped_headers.append(header)
143 continue
145 try:
146 float(raw_value)
147 except ValueError:
148 skipped_headers.append(header)
149 continue
151 numeric_headers.append(header)
153 return numeric_headers, skipped_headers