Coverage for backend/django/core/auxiliary/services/object_storage/s3.py: 77%
98 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
1import hashlib
2import re
3from datetime import UTC, datetime
4from functools import lru_cache
6import boto3
7from botocore.client import BaseClient
8from botocore.config import Config
9from botocore.exceptions import ClientError
10from botocore.response import StreamingBody
11from django.conf import settings
13from core.auxiliary.services.upload_types import (
14 CompletedMultipartPart,
15 CompletedMultipartUploadStorageResult,
16 UploadedPart,
17)
20DEFAULT_REGION = "us-east-1"
23def get_bucket_name() -> str:
24 """Return the configured default object-storage bucket name."""
25 bucket = settings.SEAWEED_S3_BUCKET
26 if not bucket: 26 ↛ 27line 26 didn't jump to line 27 because the condition on line 26 was never true
27 raise RuntimeError("SEAWEED_S3_BUCKET is not configured.")
28 return bucket
31def _client_config() -> Config:
32 """Build a botocore config for the configured S3-compatible object store."""
33 config_kwargs = {"signature_version": "s3v4"}
34 if settings.SEAWEED_S3_FORCE_PATH_STYLE: 34 ↛ 36line 34 didn't jump to line 36 because the condition on line 34 was always true
35 config_kwargs["s3"] = {"addressing_style": "path"}
36 return Config(**config_kwargs)
39def _build_client(endpoint_url: str) -> BaseClient:
40 """Create an S3 client for the provided internal or public endpoint URL."""
41 return boto3.client(
42 "s3",
43 endpoint_url=endpoint_url,
44 aws_access_key_id=settings.SEAWEED_S3_ACCESS_KEY,
45 aws_secret_access_key=settings.SEAWEED_S3_SECRET_KEY,
46 region_name=settings.SEAWEED_S3_REGION or DEFAULT_REGION,
47 config=_client_config(),
48 )
51@lru_cache(maxsize=1)
52def get_s3_client() -> BaseClient:
53 """Return the cached internal S3 client used for storage operations."""
54 endpoint_url = settings.SEAWEED_S3_ENDPOINT
55 if not endpoint_url: 55 ↛ 56line 55 didn't jump to line 56 because the condition on line 55 was never true
56 raise RuntimeError("SEAWEED_S3_ENDPOINT is not configured.")
57 return _build_client(endpoint_url)
60@lru_cache(maxsize=1)
61def get_presign_client() -> BaseClient:
62 """Return the cached S3 client used to generate browser-facing presigned URLs."""
63 endpoint_url = settings.SEAWEED_S3_PUBLIC_ENDPOINT or settings.SEAWEED_S3_ENDPOINT
64 if not endpoint_url: 64 ↛ 65line 64 didn't jump to line 65 because the condition on line 64 was never true
65 raise RuntimeError("SEAWEED_S3_PUBLIC_ENDPOINT or SEAWEED_S3_ENDPOINT must be configured.")
66 return _build_client(endpoint_url)
69def _lifecycle_rule_id(prefix: str) -> str:
70 """Build a deterministic lifecycle rule identifier for a prefix."""
71 return f"expire-{hashlib.sha1(prefix.encode('utf-8')).hexdigest()[:16]}"
74def _get_lifecycle_rules(bucket: str) -> list[dict]:
75 """Fetch lifecycle rules for a bucket, tolerating an empty configuration."""
76 try:
77 response = get_s3_client().get_bucket_lifecycle_configuration(Bucket=bucket)
78 except ClientError as exc:
79 error_code = exc.response.get("Error", {}).get("Code")
80 if error_code == "NoSuchLifecycleConfiguration": 80 ↛ 82line 80 didn't jump to line 82 because the condition on line 80 was always true
81 return []
82 raise
83 return list(response.get("Rules", []))
86def _put_lifecycle_rule(bucket: str, rule: dict) -> None:
87 """Upsert a lifecycle rule on the bucket."""
88 rules = [existing for existing in _get_lifecycle_rules(bucket) if existing.get("ID") != rule["ID"]]
89 rules.append(rule)
90 get_s3_client().put_bucket_lifecycle_configuration(
91 Bucket=bucket,
92 LifecycleConfiguration={"Rules": rules},
93 )
96def _sanitize_content_disposition_filename(filename: str) -> str:
97 sanitized = re.sub(r'[\r\n"\\;]', "_", filename).strip()
98 return sanitized or "download"
101def schedule_object_expiration(bucket: str, key: str, expires_at: datetime) -> None:
102 """Schedule expiration for a single object using an exact-key prefix rule."""
103 # S3-compatible lifecycle prefix filters are starts-with matches rather than strict
104 # equality checks. We use the full object key here; because upload keys include a
105 # UUID segment, the effective match remains unique for our generated objects.
106 _put_lifecycle_rule(
107 bucket,
108 {
109 "ID": _lifecycle_rule_id(key),
110 "Status": "Enabled",
111 "Filter": {"Prefix": key},
112 "Expiration": {"Date": expires_at.astimezone(UTC).replace(microsecond=0)},
113 },
114 )
117def create_multipart_upload(bucket: str, key: str, content_type: str) -> str:
118 """Start a multipart upload and return the storage upload id."""
119 response = get_s3_client().create_multipart_upload(
120 Bucket=bucket,
121 Key=key,
122 ContentType=content_type or "text/csv",
123 )
124 return response["UploadId"]
127def presign_upload_part(
128 bucket: str,
129 key: str,
130 upload_id: str,
131 part_number: int,
132 expires_seconds: int,
133) -> str:
134 """Generate a presigned URL for one multipart upload part."""
135 return get_presign_client().generate_presigned_url(
136 "upload_part",
137 Params={
138 "Bucket": bucket,
139 "Key": key,
140 "UploadId": upload_id,
141 "PartNumber": part_number,
142 },
143 ExpiresIn=expires_seconds,
144 )
147def presign_download_url(
148 bucket: str,
149 key: str,
150 filename: str,
151 expires_seconds: int = 3600,
152) -> str:
153 """Generate a presigned URL for downloading an object from S3-compatible storage."""
154 safe_filename = _sanitize_content_disposition_filename(filename)
155 return get_presign_client().generate_presigned_url(
156 "get_object",
157 Params={
158 "Bucket": bucket,
159 "Key": key,
160 "ResponseContentDisposition": f'attachment; filename="{safe_filename}"',
161 },
162 ExpiresIn=expires_seconds,
163 )
166def list_uploaded_parts(bucket: str, key: str, upload_id: str) -> list[UploadedPart]:
167 """Return all uploaded multipart parts currently stored for an upload id."""
168 client = get_s3_client()
169 parts: list[UploadedPart] = []
170 part_number_marker = 0
172 while True:
173 response = client.list_parts(
174 Bucket=bucket,
175 Key=key,
176 UploadId=upload_id,
177 PartNumberMarker=part_number_marker,
178 )
179 parts.extend(
180 UploadedPart(
181 part_number=part["PartNumber"],
182 etag=part["ETag"],
183 size_bytes=part["Size"],
184 )
185 for part in response.get("Parts", [])
186 )
187 if not response.get("IsTruncated"): 187 ↛ 189line 187 didn't jump to line 189 because the condition on line 187 was always true
188 break
189 part_number_marker = response.get("NextPartNumberMarker", 0)
191 return parts
194def complete_multipart_upload(
195 bucket: str,
196 key: str,
197 upload_id: str,
198 parts: list[CompletedMultipartPart],
199) -> CompletedMultipartUploadStorageResult:
200 """Finalize a multipart upload and return the storage completion metadata."""
201 sorted_parts = sorted(parts, key=lambda part: part.part_number)
202 response = get_s3_client().complete_multipart_upload(
203 Bucket=bucket,
204 Key=key,
205 UploadId=upload_id,
206 MultipartUpload={
207 "Parts": [
208 {"PartNumber": part.part_number, "ETag": part.etag}
209 for part in sorted_parts
210 ]
211 },
212 )
213 return CompletedMultipartUploadStorageResult(
214 bucket=bucket,
215 object_key=key,
216 etag=response.get("ETag"),
217 location=response.get("Location"),
218 )
221def abort_multipart_upload(bucket: str, key: str, upload_id: str) -> None:
222 """Abort a multipart upload that should no longer accept browser parts."""
223 get_s3_client().abort_multipart_upload(Bucket=bucket, Key=key, UploadId=upload_id)
226def get_range(bucket: str, key: str, byte_range: str) -> bytes:
227 """Read a byte range from an object for lightweight CSV inspection."""
228 response = get_s3_client().get_object(Bucket=bucket, Key=key, Range=f"bytes={byte_range}")
229 return response["Body"].read()
232def stream_object(bucket: str, key: str) -> StreamingBody:
233 """Open a streaming reader for the full object body."""
234 response = get_s3_client().get_object(Bucket=bucket, Key=key)
235 return response["Body"]
238def object_exists(bucket: str, key: str) -> bool:
239 """Return whether the configured object is still available in object storage."""
240 try:
241 get_s3_client().head_object(Bucket=bucket, Key=key)
242 except ClientError as exc:
243 error_code = exc.response.get("Error", {}).get("Code")
244 if error_code in {"404", "NoSuchKey", "NotFound"}:
245 return False
246 raise
247 return True
250def delete_object(bucket: str, key: str) -> None:
251 """Delete an object from S3-compatible object storage."""
252 get_s3_client().delete_object(Bucket=bucket, Key=key)