From d4484b665f8fd527cad3f998b3b349a6db81b89f Mon Sep 17 00:00:00 2001 From: Diego Ripley Date: Fri, 13 Mar 2026 07:16:47 -0400 Subject: [PATCH] Updates to Cloudflare HTTP ingestor. I tried to calculate different file hashes and failed miserably --- scripts/05_cloudflare_http_ingestor/README.md | 106 ++++- .../05_cloudflare_http_ingestor/src/index.js | 412 ++++++++++++------ .../05_cloudflare_http_ingestor/wrangler.toml | 3 +- 3 files changed, 372 insertions(+), 149 deletions(-) diff --git a/scripts/05_cloudflare_http_ingestor/README.md b/scripts/05_cloudflare_http_ingestor/README.md index ab86ccd..88f0ef3 100644 --- a/scripts/05_cloudflare_http_ingestor/README.md +++ b/scripts/05_cloudflare_http_ingestor/README.md @@ -12,6 +12,11 @@ Client POST ──▶ Worker ──stream──▶ S3 PutObject / Multipart ├─ Auth check (Bearer token) ├─ Fetch source URL (custom User-Agent) └─ Sign with AWS Sig V4 (aws4fetch) + +Client PUT ──▶ Worker ──stream──▶ S3 PutObject / Multipart + │ + ├─ Auth check (Bearer token) + └─ Direct binary upload (X-S3-Key header) ``` **Two upload paths are used automatically:** @@ -19,7 +24,7 @@ Client POST ──▶ Worker ──stream──▶ S3 PutObject / Multipart | Condition | Upload method | Memory overhead | |---|---|---| | Known size ≤ 100 MiB | Single streaming `PUT` | ~0 (pipe-through) | -| Unknown size **or** > 100 MiB | Multipart upload in 5 MiB chunks | ≤ 5 MiB | +| Unknown size **or** > 100 MiB | Multipart upload in 25 MiB chunks | ≤ 25 MiB | > Files larger than 100 MiB always use multipart upload because Cloudflare > Workers enforce a body-size limit on single outbound `fetch()` requests. @@ -78,7 +83,9 @@ pnpm run deploy ## Usage -### Request +### Download mode (POST) + +Downloads a file from a URL and uploads it to S3. **Method:** `POST` **Content-Type:** `application/json` @@ -92,10 +99,10 @@ pnpm run deploy | `user_agent` | Yes | User-Agent string for the download request | | `key_prefix` | No | Destination path within the S3 bucket | -### Example +#### Example ```bash -curl -X POST https://cf-data-ingestor..workers.dev \ +curl -X POST https://cf-data-ingestor.labs.dataforcanada.org \ -H "Authorization: Bearer " \ -H "Content-Type: application/json" \ -d '{ @@ -105,30 +112,103 @@ curl -X POST https://cf-data-ingestor..workers.dev \ }' ``` -### Successful response +#### Successful response ```json { "ok": true, "bucket": "us-west-2.opendata.source.coop", - "key": "dataforcanada/d4c-datapkg-orthoimagery/archive/ca-qc_government_and_municipalities_of_quebec-2026A000224_d4c-datapkg-orthoimagery_orthorectified_imagery_from_quebec/mos_14_31n02_se_30cm_f09.JP2", + "key": "dataforcanada/d4c-datapkg-orthoimagery/archive/ca-qc_government_and_municipalities_of_quebec-2026A000224_d4c-datapkg-orthoimagery_orthorectified_imagery_from_quebecdataforcanada/.../mos_14_31n02_se_30cm_f09.JP2", "content_type": "application/x-msdownload", - "size_bytes": 773722941 + "size_bytes": 773722941, + "etag": "abc123def456", + "multipart_part_size": 26214400, + "multipart_number_parts": 30, + "started_at": "2026-03-12T21:00:00.000Z", + "finished_at": "2026-03-12T21:01:30.000Z" } ``` +> `multipart_part_size` and `multipart_number_parts` are only present when multipart upload was used (file > 100 MiB or unknown size). + +### Direct upload mode (PUT) + +Uploads a binary file body directly to S3. Useful for uploading local files +(e.g. Parquet artifacts) without needing a public download URL. + +**Method:** `PUT` +**Authorization:** `Bearer ` + +**Required headers:** + +| Header | Description | +|---|---| +| `X-S3-Key` | Full S3 object key (e.g. `dataforcanada/my-dataset/data.parquet`) | + +**Optional headers:** + +| Header | Description | +|---|---| +| `Content-Type` | MIME type (default: `application/octet-stream`) | +| `Content-Length` | File size in bytes (enables single PUT for files ≤ 100 MiB) | + +**Body:** Raw binary file content. + +#### Example + +```bash +curl -X PUT https://cf-data-ingestor.labs.dataforcanada.org \ + -H "Authorization: Bearer " \ + -H "X-S3-Key: dataforcanada/my-dataset/downloads.parquet" \ + -H "Content-Type: application/octet-stream" \ + -H "Content-Length: $(stat -c%s downloads.parquet)" \ + --data-binary @downloads.parquet +``` + +#### Successful response + +```json +{ + "ok": true, + "bucket": "us-west-2.opendata.source.coop", + "key": "dataforcanada/my-dataset/downloads.parquet", + "content_type": "application/octet-stream", + "size_bytes": 45231, + "etag": "def456abc789", + "started_at": "2026-03-12T21:00:00.000Z", + "finished_at": "2026-03-12T21:00:01.000Z" +} +``` + +### Response fields + +| Field | Type | Always present | Description | +|---|---|---|---| +| `ok` | boolean | Yes | `true` on success | +| `bucket` | string | Yes | S3 bucket name | +| `key` | string | Yes | S3 object key | +| `content_type` | string | Yes | MIME type of the uploaded file | +| `size_bytes` | number | When Content-Length known | File size in bytes | +| `etag` | string | When available | S3 ETag (quotes stripped) | +| `multipart_part_size` | number | Only for multipart | Part size in bytes (25 MiB) | +| `multipart_number_parts` | number | Only for multipart | Number of parts uploaded | +| `started_at` | string | Yes | ISO-8601 UTC timestamp when processing started | +| `finished_at` | string | Yes | ISO-8601 UTC timestamp when processing finished | + ### Error responses | Status | Meaning | |--------|---------| | 401 | Missing or invalid Bearer token | -| 405 | Non-POST method | -| 415 | Content-Type is not `application/json` | -| 400 | Malformed JSON or missing fields | +| 405 | Non-POST/PUT method | +| 415 | Content-Type is not `application/json` (POST only) | +| 400 | Malformed JSON, missing fields, or missing `X-S3-Key` header | | 502 | Source download or S3 upload failed | ## S3 Object Key +### POST mode + Only the **filename** is extracted from the `download_url` and placed under the `key_prefix`. The source URL's directory hierarchy is not preserved. ``` @@ -139,10 +219,14 @@ key_prefix: "dataforcanada/d4c-datapkg-orthoimagery/archive/ca-qc_government_a If `key_prefix` is omitted or empty, the file uploads to the bucket root. +### PUT mode + +The full S3 key is specified directly via the `X-S3-Key` header. + ## Local Development ```bash pnpm run dev ``` -Then POST to `http://localhost:8787`. Wrangler reads secrets from the `.env` file you created in step 3. You can also create environment-specific overrides (e.g. `.env.staging`) — see the [Cloudflare docs](https://developers.cloudflare.com/workers/configuration/secrets/#local-development-with-secrets) for the full `.env` precedence rules. +Then POST or PUT to `http://localhost:8787`. Wrangler reads secrets from the `.env` file you created in step 3. You can also create environment-specific overrides (e.g. `.env.staging`) — see the [Cloudflare docs](https://developers.cloudflare.com/workers/configuration/secrets/#local-development-with-secrets) for the full `.env` precedence rules. diff --git a/scripts/05_cloudflare_http_ingestor/src/index.js b/scripts/05_cloudflare_http_ingestor/src/index.js index 50f83fa..368c82b 100644 --- a/scripts/05_cloudflare_http_ingestor/src/index.js +++ b/scripts/05_cloudflare_http_ingestor/src/index.js @@ -3,7 +3,7 @@ import { AwsClient } from "aws4fetch"; // --------------------------------------------------------------------------- // Constants // --------------------------------------------------------------------------- -const MIN_PART_SIZE = 5 * 1024 * 1024; // 5 MiB – S3 minimum for multipart parts +const MIN_PART_SIZE = 25 * 1024 * 1024; // 25 MiB – keeps memory bounded on CF Workers (128 MiB limit) const MAX_SINGLE_PUT_SIZE = 100 * 1024 * 1024; // 100 MiB – above this, always use multipart const MAX_RETRIES = 3; const RETRY_BASE_DELAY_MS = 1000; // 1 second, doubles each retry @@ -93,6 +93,12 @@ function isRetryable(err) { return false; } +/** Strip surrounding double-quotes from an ETag string (S3 returns them quoted). */ +function cleanEtag(etag) { + if (!etag) return null; + return etag.replace(/^"/, "").replace(/"$/, ""); +} + // --------------------------------------------------------------------------- // S3 Upload – Single PUT (streaming, requires known Content-Length) // --------------------------------------------------------------------------- @@ -122,8 +128,9 @@ async function putObjectStreaming(aws, bucket, region, key, body, contentLength, console.error(`[putObjectStreaming] S3 PUT error body: ${text}`); throw new Error(`S3 PUT failed (${resp.status}): ${text}`); } - console.log(`[putObjectStreaming] PUT succeeded`); - return resp; + const etag = resp.headers.get("ETag"); + console.log(`[putObjectStreaming] PUT succeeded, ETag: ${etag}`); + return { resp, etag: cleanEtag(etag) }; } // --------------------------------------------------------------------------- @@ -191,7 +198,6 @@ async function uploadPartWithRetry(aws, bucket, region, key, uploadId, partNumbe let lastErr; for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) { try { - // Each retry needs a fresh body since the previous stream may be consumed const etag = await uploadPart(aws, bucket, region, key, uploadId, partNumber, blob, length, endpoint); return etag; } catch (err) { @@ -229,8 +235,11 @@ async function completeMultipart(aws, bucket, region, key, uploadId, parts, endp console.error(`[completeMultipart] Error body: ${text}`); throw new Error(`Complete multipart failed (${resp.status}): ${text}`); } - console.log(`[completeMultipart] Multipart upload completed successfully`); - return resp; + const xml = await resp.text(); + const etagMatch = xml.match(/(.+?)<\/ETag>/); + const etag = etagMatch ? etagMatch[1] : resp.headers.get("ETag"); + console.log(`[completeMultipart] Multipart upload completed successfully, ETag: ${etag}`); + return { resp, etag: cleanEtag(etag) }; } async function abortMultipart(aws, bucket, region, key, uploadId, endpoint) { @@ -244,17 +253,35 @@ async function abortMultipart(aws, bucket, region, key, uploadId, endpoint) { } } +/** + * Concatenate an array of Uint8Array chunks into a single Uint8Array. + * This replaces the Blob-based approach to avoid holding both the chunk + * array *and* the Blob in memory simultaneously. + */ +function concatChunks(chunks, totalLength) { + const result = new Uint8Array(totalLength); + let offset = 0; + for (let i = 0; i < chunks.length; i++) { + result.set(chunks[i], offset); + offset += chunks[i].byteLength; + chunks[i] = null; // release reference to each chunk eagerly + } + return result; +} + /** * Read from a ReadableStream in ≥ MIN_PART_SIZE chunks and upload each as an * S3 multipart part. Memory usage stays bounded to ~MIN_PART_SIZE at a time. + * + * Returns { etag, totalBytesRead, totalParts }. */ async function multipartStreamUpload(aws, bucket, region, key, stream, contentType, endpoint) { console.log(`[multipartStreamUpload] Starting multipart upload for key: ${key}`); const uploadId = await initiateMultipart(aws, bucket, region, key, contentType, endpoint); const parts = []; let partNumber = 1; - let buffer = []; - let bufferSize = 0; + let chunks = []; + let chunksSize = 0; let totalBytesRead = 0; const reader = stream.getReader(); @@ -267,32 +294,35 @@ async function multipartStreamUpload(aws, bucket, region, key, stream, contentTy break; } - buffer.push(value); - bufferSize += value.byteLength; + chunks.push(value); + chunksSize += value.byteLength; totalBytesRead += value.byteLength; // Flush when we've accumulated enough for a part - if (bufferSize >= MIN_PART_SIZE) { - console.log(`[multipartStreamUpload] Flushing part ${partNumber}, buffer size: ${bufferSize}`); - const blob = new Blob(buffer); - const etag = await uploadPartWithRetry(aws, bucket, region, key, uploadId, partNumber, blob, bufferSize, endpoint); + if (chunksSize >= MIN_PART_SIZE) { + console.log(`[multipartStreamUpload] Flushing part ${partNumber}, buffer size: ${chunksSize}`); + const partBody = concatChunks(chunks, chunksSize); + chunks = []; // release the chunk array immediately + chunksSize = 0; + const etag = await uploadPartWithRetry(aws, bucket, region, key, uploadId, partNumber, partBody, partBody.byteLength, endpoint); parts.push({ partNumber, etag }); partNumber++; - buffer = []; - bufferSize = 0; } } // Upload remaining bytes as the final part - if (bufferSize > 0) { - console.log(`[multipartStreamUpload] Uploading final part ${partNumber}, size: ${bufferSize}`); - const blob = new Blob(buffer); - const etag = await uploadPartWithRetry(aws, bucket, region, key, uploadId, partNumber, blob, bufferSize, endpoint); + if (chunksSize > 0) { + console.log(`[multipartStreamUpload] Uploading final part ${partNumber}, size: ${chunksSize}`); + const partBody = concatChunks(chunks, chunksSize); + chunks = []; + chunksSize = 0; + const etag = await uploadPartWithRetry(aws, bucket, region, key, uploadId, partNumber, partBody, partBody.byteLength, endpoint); parts.push({ partNumber, etag }); } - await completeMultipart(aws, bucket, region, key, uploadId, parts, endpoint); - console.log(`[multipartStreamUpload] Upload complete. Total parts: ${parts.length}, total bytes: ${totalBytesRead}`); + const { etag } = await completeMultipart(aws, bucket, region, key, uploadId, parts, endpoint); + console.log(`[multipartStreamUpload] Upload complete. Total parts: ${parts.length}, total bytes: ${totalBytesRead}, ETag: ${etag}`); + return { etag, totalBytesRead, totalParts: parts.length }; } catch (err) { console.error(`[multipartStreamUpload] Upload failed at part ${partNumber}: ${err.message}`); console.error(`[multipartStreamUpload] Error stack: ${err.stack}`); @@ -301,6 +331,45 @@ async function multipartStreamUpload(aws, bucket, region, key, stream, contentTy } } +// --------------------------------------------------------------------------- +// Unified upload helper – decides single PUT vs multipart +// --------------------------------------------------------------------------- + +/** + * Upload a ReadableStream to S3, choosing single PUT or multipart based on + * the known content length. + * + * Returns { etag, useMultipart, totalParts }. + */ +async function doUpload(aws, bucket, region, key, stream, contentType, contentLength, endpoint) { + const numericLength = contentLength ? Number(contentLength) : 0; + const useMultipart = !numericLength || numericLength > MAX_SINGLE_PUT_SIZE; + + console.log(`[doUpload] key=${key}, strategy=${useMultipart ? `multipart (size: ${numericLength || "unknown"})` : `single PUT (${numericLength} bytes)`}`); + + if (!useMultipart) { + const result = await putObjectStreaming(aws, bucket, region, key, stream, contentLength, contentType, endpoint); + return { etag: result.etag, useMultipart: false, totalParts: 0 }; + } else { + const result = await multipartStreamUpload(aws, bucket, region, key, stream, contentType, endpoint); + return { etag: result.etag, useMultipart: true, totalParts: result.totalParts }; + } +} + +// --------------------------------------------------------------------------- +// Auth helper +// --------------------------------------------------------------------------- + +/** Validate the Bearer token. Returns null on success, or an error Response. */ +function checkAuth(request, env) { + const authHeader = request.headers.get("Authorization") || ""; + const token = authHeader.startsWith("Bearer ") ? authHeader.slice(7) : ""; + if (!token || token !== env.AUTH_TOKEN) { + return textResponse("Unauthorized", 401); + } + return null; +} + // --------------------------------------------------------------------------- // Main handler // --------------------------------------------------------------------------- @@ -310,141 +379,210 @@ export default { const requestId = crypto.randomUUID(); console.log(`[handler] Request ${requestId} received: ${request.method} ${request.url}`); - // ---- Method check ---- - if (request.method !== "POST") { - console.log(`[handler] ${requestId} rejected: method ${request.method}`); - return textResponse("Method Not Allowed. Use POST.", 405); + // ---- Route by method ---- + if (request.method === "PUT") { + return handleDirectUpload(request, env, requestId); + } + if (request.method === "POST") { + return handleDownloadAndUpload(request, env, requestId); } - // ---- Auth check ---- - const authHeader = request.headers.get("Authorization") || ""; - const token = authHeader.startsWith("Bearer ") ? authHeader.slice(7) : ""; - if (!token || token !== env.AUTH_TOKEN) { - console.log(`[handler] ${requestId} rejected: unauthorized`); - return textResponse("Unauthorized", 401); - } + console.log(`[handler] ${requestId} rejected: method ${request.method}`); + return textResponse("Method Not Allowed. Use POST or PUT.", 405); + }, +}; - // ---- Content-Type check ---- - const ct = request.headers.get("Content-Type") || ""; - if (!ct.includes("application/json")) { - console.log(`[handler] ${requestId} rejected: Content-Type "${ct}"`); - return textResponse("Content-Type must be application/json", 415); - } +// --------------------------------------------------------------------------- +// POST handler – download from URL and upload to S3 +// --------------------------------------------------------------------------- - // ---- Parse body ---- - let payload; - try { - payload = await request.json(); - } catch (parseErr) { - console.error(`[handler] ${requestId} JSON parse error: ${parseErr.message}`); - return textResponse("Invalid JSON body", 400); - } +async function handleDownloadAndUpload(request, env, requestId) { + const startedAt = new Date().toISOString(); - const { download_url, user_agent, key_prefix } = payload; - console.log(`[handler] ${requestId} payload: download_url=${download_url}, user_agent=${user_agent}, key_prefix=${key_prefix || "(none)"}`); + // ---- Auth check ---- + const authErr = checkAuth(request, env); + if (authErr) { + console.log(`[handler] ${requestId} rejected: unauthorized`); + return authErr; + } - if (!download_url || !user_agent) { - console.log(`[handler] ${requestId} rejected: missing required fields`); - return jsonResponse( - { error: "'download_url' and 'user_agent' are required." }, - 400, - ); - } + // ---- Content-Type check ---- + const ct = request.headers.get("Content-Type") || ""; + if (!ct.includes("application/json")) { + console.log(`[handler] ${requestId} rejected: Content-Type "${ct}"`); + return textResponse("Content-Type must be application/json", 415); + } - // Validate the download URL - let parsedUrl; - try { - parsedUrl = new URL(download_url); - if (!["http:", "https:"].includes(parsedUrl.protocol)) throw new Error(); - } catch { - console.log(`[handler] ${requestId} rejected: invalid download_url`); - return jsonResponse({ error: "Invalid download_url" }, 400); - } + // ---- Parse body ---- + let payload; + try { + payload = await request.json(); + } catch (parseErr) { + console.error(`[handler] ${requestId} JSON parse error: ${parseErr.message}`); + return textResponse("Invalid JSON body", 400); + } - // ---- Fetch the source file (streaming) ---- - console.log(`[handler] ${requestId} fetching source: ${download_url}`); - let sourceResp; - try { - sourceResp = await fetch(download_url, { - headers: { "User-Agent": user_agent }, - redirect: "follow", - }); - } catch (err) { - console.error(`[handler] ${requestId} download fetch error: ${err.message}`); - console.error(`[handler] ${requestId} download fetch stack: ${err.stack}`); - return jsonResponse({ error: `Download failed: ${err.message}` }, 502); - } + const { download_url, user_agent, key_prefix } = payload; + console.log(`[handler] ${requestId} payload: download_url=${download_url}, user_agent=${user_agent}, key_prefix=${key_prefix || "(none)"}`); - console.log(`[handler] ${requestId} source response: status=${sourceResp.status}, Content-Type=${sourceResp.headers.get("Content-Type")}, Content-Length=${sourceResp.headers.get("Content-Length")}`); + if (!download_url || !user_agent) { + console.log(`[handler] ${requestId} rejected: missing required fields`); + return jsonResponse( + { error: "'download_url' and 'user_agent' are required." }, + 400, + ); + } - if (!sourceResp.ok) { - console.error(`[handler] ${requestId} source returned non-OK: ${sourceResp.status}`); - return jsonResponse( - { error: `Source returned HTTP ${sourceResp.status}` }, - 502, - ); - } + // Validate the download URL + let parsedUrl; + try { + parsedUrl = new URL(download_url); + if (!["http:", "https:"].includes(parsedUrl.protocol)) throw new Error(); + } catch { + console.log(`[handler] ${requestId} rejected: invalid download_url`); + return jsonResponse({ error: "Invalid download_url" }, 400); + } - // ---- Prepare S3 parameters ---- - const bucket = env.S3_BUCKET; - const region = env.S3_REGION || "us-west-2"; - const endpoint = env.S3_ENDPOINT || ""; - const key = objectKeyFromUrl(download_url, key_prefix || ""); - const sourceContentType = - sourceResp.headers.get("Content-Type") || "application/octet-stream"; - const contentLength = sourceResp.headers.get("Content-Length"); + // ---- Fetch the source file (streaming) ---- + console.log(`[handler] ${requestId} fetching source: ${download_url}`); + let sourceResp; + try { + sourceResp = await fetch(download_url, { + headers: { "User-Agent": user_agent }, + redirect: "follow", + }); + } catch (err) { + console.error(`[handler] ${requestId} download fetch error: ${err.message}`); + console.error(`[handler] ${requestId} download fetch stack: ${err.stack}`); + return jsonResponse({ error: `Download failed: ${err.message}` }, 502); + } - const numericLength = contentLength ? Number(contentLength) : 0; - const useMultipart = !numericLength || numericLength > MAX_SINGLE_PUT_SIZE; + console.log(`[handler] ${requestId} source response: status=${sourceResp.status}, Content-Type=${sourceResp.headers.get("Content-Type")}, Content-Length=${sourceResp.headers.get("Content-Length")}`); - console.log(`[handler] ${requestId} S3 target: bucket=${bucket}, region=${region}, endpoint=${endpoint || "(default)"}, key=${key}`); - console.log(`[handler] ${requestId} upload strategy: ${useMultipart ? `multipart (size: ${numericLength || "unknown"})` : `single PUT (${numericLength} bytes)`}`); + if (!sourceResp.ok) { + console.error(`[handler] ${requestId} source returned non-OK: ${sourceResp.status}`); + return jsonResponse( + { error: `Source returned HTTP ${sourceResp.status}` }, + 502, + ); + } - const aws = makeAwsClient(env); + // ---- Prepare S3 parameters ---- + const bucket = env.S3_BUCKET; + const region = env.S3_REGION || "us-west-2"; + const endpoint = env.S3_ENDPOINT || ""; + const key = objectKeyFromUrl(download_url, key_prefix || ""); + const sourceContentType = + sourceResp.headers.get("Content-Type") || "application/octet-stream"; + const contentLength = sourceResp.headers.get("Content-Length"); - // ---- Upload to S3 ---- - const uploadStart = Date.now(); - try { - if (!useMultipart) { - // Known size ≤ 100 MiB → single streaming PUT (zero extra memory) - await putObjectStreaming( - aws, - bucket, - region, - key, - sourceResp.body, - contentLength, - sourceContentType, - endpoint, - ); - } else { - // Unknown size or > 100 MiB → multipart streaming upload (≤ 5 MiB buffer) - await multipartStreamUpload( - aws, - bucket, - region, - key, - sourceResp.body, - sourceContentType, - endpoint, - ); - } - } catch (err) { - const elapsed = ((Date.now() - uploadStart) / 1000).toFixed(1); - console.error(`[handler] ${requestId} S3 upload failed after ${elapsed}s: ${err.message}`); - console.error(`[handler] ${requestId} S3 upload error stack: ${err.stack}`); - return jsonResponse({ error: `S3 upload failed: ${err.message}` }, 502); - } + console.log(`[handler] ${requestId} S3 target: bucket=${bucket}, region=${region}, endpoint=${endpoint || "(default)"}, key=${key}`); - const elapsed = ((Date.now() - uploadStart) / 1000).toFixed(1); - console.log(`[handler] ${requestId} upload completed in ${elapsed}s`); + const aws = makeAwsClient(env); + + // ---- Upload to S3 (stream directly, no tee) ---- + try { + const uploadResult = await doUpload(aws, bucket, region, key, sourceResp.body, sourceContentType, contentLength, endpoint); + + const finishedAt = new Date().toISOString(); + const sizeBytes = contentLength ? Number(contentLength) : null; + + console.log(`[handler] ${requestId} upload completed, ETag: ${uploadResult.etag}`); return jsonResponse({ ok: true, bucket, key, content_type: sourceContentType, - ...(contentLength ? { size_bytes: Number(contentLength) } : {}), + ...(sizeBytes !== null ? { size_bytes: sizeBytes } : {}), + ...(uploadResult.etag ? { etag: uploadResult.etag } : {}), + ...(uploadResult.useMultipart ? { + multipart_part_size: MIN_PART_SIZE, + multipart_number_parts: uploadResult.totalParts, + } : {}), + started_at: startedAt, + finished_at: finishedAt, }); - }, -}; + } catch (err) { + const finishedAt = new Date().toISOString(); + console.error(`[handler] ${requestId} S3 upload failed: ${err.message}`); + console.error(`[handler] ${requestId} S3 upload error stack: ${err.stack}`); + return jsonResponse({ + error: `S3 upload failed: ${err.message}`, + started_at: startedAt, + finished_at: finishedAt, + }, 502); + } +} + +// --------------------------------------------------------------------------- +// PUT handler – direct file upload to S3 +// --------------------------------------------------------------------------- + +async function handleDirectUpload(request, env, requestId) { + const startedAt = new Date().toISOString(); + + // ---- Auth check ---- + const authErr = checkAuth(request, env); + if (authErr) { + console.log(`[handler] ${requestId} rejected: unauthorized`); + return authErr; + } + + // ---- Validate required header ---- + const s3Key = request.headers.get("X-S3-Key"); + if (!s3Key) { + console.log(`[handler] ${requestId} rejected: missing X-S3-Key header`); + return jsonResponse({ error: "'X-S3-Key' header is required." }, 400); + } + + const contentType = request.headers.get("Content-Type") || "application/octet-stream"; + const contentLength = request.headers.get("Content-Length"); + + console.log(`[handler] ${requestId} direct upload: key=${s3Key}, Content-Type=${contentType}, Content-Length=${contentLength || "(unknown)"}`); + + if (!request.body) { + console.log(`[handler] ${requestId} rejected: no body`); + return jsonResponse({ error: "Request body is required." }, 400); + } + + // ---- Prepare S3 parameters ---- + const bucket = env.S3_BUCKET; + const region = env.S3_REGION || "us-west-2"; + const endpoint = env.S3_ENDPOINT || ""; + const aws = makeAwsClient(env); + + // ---- Upload to S3 (stream directly, no tee) ---- + try { + const uploadResult = await doUpload(aws, bucket, region, s3Key, request.body, contentType, contentLength, endpoint); + + const finishedAt = new Date().toISOString(); + const sizeBytes = contentLength ? Number(contentLength) : null; + + console.log(`[handler] ${requestId} direct upload completed, ETag: ${uploadResult.etag}`); + + return jsonResponse({ + ok: true, + bucket, + key: s3Key, + content_type: contentType, + ...(sizeBytes !== null ? { size_bytes: sizeBytes } : {}), + ...(uploadResult.etag ? { etag: uploadResult.etag } : {}), + ...(uploadResult.useMultipart ? { + multipart_part_size: MIN_PART_SIZE, + multipart_number_parts: uploadResult.totalParts, + } : {}), + started_at: startedAt, + finished_at: finishedAt, + }); + } catch (err) { + const finishedAt = new Date().toISOString(); + console.error(`[handler] ${requestId} direct upload failed: ${err.message}`); + console.error(`[handler] ${requestId} direct upload error stack: ${err.stack}`); + return jsonResponse({ + error: `S3 upload failed: ${err.message}`, + started_at: startedAt, + finished_at: finishedAt, + }, 502); + } +} diff --git a/scripts/05_cloudflare_http_ingestor/wrangler.toml b/scripts/05_cloudflare_http_ingestor/wrangler.toml index 04ab612..91c48f2 100644 --- a/scripts/05_cloudflare_http_ingestor/wrangler.toml +++ b/scripts/05_cloudflare_http_ingestor/wrangler.toml @@ -20,7 +20,8 @@ S3_ENDPOINT = "" [placement] # TODO: parametize so user can specifically set where they want the worker to run -mode = "smart" +#mode = "smart" +hostname = "diffusion.mern.gouv.qc.ca" [observability] enabled = true