mirror of
https://github.com/dataforcanada/d4c-infra-distribution.git
synced 2026-06-13 22:20:55 +02:00
Got it working. Will start running it tomorrow early in the morning, going to chill
This commit is contained in:
@@ -16,10 +16,13 @@ Client POST ──▶ Worker ──stream──▶ S3 PutObject / Multipart
|
||||
|
||||
**Two upload paths are used automatically:**
|
||||
|
||||
| Source provides `Content-Length`? | Upload method | Memory overhead |
|
||||
| Condition | Upload method | Memory overhead |
|
||||
|---|---|---|
|
||||
| Yes | Single streaming `PUT` | ~0 (pipe-through) |
|
||||
| No | Multipart upload in 5 MiB chunks | ≤ 5 MiB |
|
||||
| Known size ≤ 100 MiB | Single streaming `PUT` | ~0 (pipe-through) |
|
||||
| Unknown size **or** > 100 MiB | Multipart upload in 5 MiB chunks | ≤ 5 MiB |
|
||||
|
||||
> Files larger than 100 MiB always use multipart upload because Cloudflare
|
||||
> Workers enforce a body-size limit on single outbound `fetch()` requests.
|
||||
|
||||
## Setup
|
||||
|
||||
|
||||
@@ -4,6 +4,9 @@ import { AwsClient } from "aws4fetch";
|
||||
// Constants
|
||||
// ---------------------------------------------------------------------------
|
||||
const MIN_PART_SIZE = 5 * 1024 * 1024; // 5 MiB – S3 minimum for multipart parts
|
||||
const MAX_SINGLE_PUT_SIZE = 100 * 1024 * 1024; // 100 MiB – above this, always use multipart
|
||||
const MAX_RETRIES = 3;
|
||||
const RETRY_BASE_DELAY_MS = 1000; // 1 second, doubles each retry
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
@@ -62,28 +65,64 @@ function s3Url(bucket, key, region, endpoint) {
|
||||
return `${base}/${bucket}/${encodeURI(key)}`;
|
||||
}
|
||||
|
||||
/** Sleep for the given number of milliseconds. */
|
||||
function sleep(ms) {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine if an error is retryable (transient network / server errors).
|
||||
*/
|
||||
function isRetryable(err) {
|
||||
const msg = (err.message || "").toLowerCase();
|
||||
// Network-level failures
|
||||
if (
|
||||
msg.includes("network") ||
|
||||
msg.includes("connection") ||
|
||||
msg.includes("timeout") ||
|
||||
msg.includes("socket") ||
|
||||
msg.includes("econnreset") ||
|
||||
msg.includes("fetch failed")
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
// S3 server-side errors (5xx)
|
||||
if (/\b5\d{2}\b/.test(msg)) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// S3 Upload – Single PUT (streaming, requires known Content-Length)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async function putObjectStreaming(aws, bucket, region, key, body, contentLength, contentType, endpoint) {
|
||||
const url = s3Url(bucket, key, region, endpoint);
|
||||
console.log(`[putObjectStreaming] URL: ${url}`);
|
||||
console.log(`[putObjectStreaming] Content-Type: ${contentType}, Content-Length: ${contentLength}`);
|
||||
|
||||
const headers = {
|
||||
"Content-Type": contentType || "application/octet-stream",
|
||||
"Content-Length": String(contentLength),
|
||||
"x-amz-acl": "bucket-owner-full-control",
|
||||
"x-amz-content-sha256": "UNSIGNED-PAYLOAD",
|
||||
};
|
||||
|
||||
console.log(`[putObjectStreaming] Sending PUT request...`);
|
||||
const resp = await aws.fetch(url, {
|
||||
method: "PUT",
|
||||
headers,
|
||||
body, // ReadableStream – streamed directly, no buffering
|
||||
});
|
||||
|
||||
console.log(`[putObjectStreaming] Response status: ${resp.status}`);
|
||||
if (!resp.ok) {
|
||||
const text = await resp.text();
|
||||
console.error(`[putObjectStreaming] S3 PUT error body: ${text}`);
|
||||
throw new Error(`S3 PUT failed (${resp.status}): ${text}`);
|
||||
}
|
||||
console.log(`[putObjectStreaming] PUT succeeded`);
|
||||
return resp;
|
||||
}
|
||||
|
||||
@@ -93,62 +132,115 @@ async function putObjectStreaming(aws, bucket, region, key, body, contentLength,
|
||||
|
||||
async function initiateMultipart(aws, bucket, region, key, contentType, endpoint) {
|
||||
const url = `${s3Url(bucket, key, region, endpoint)}?uploads`;
|
||||
console.log(`[initiateMultipart] URL: ${url}`);
|
||||
|
||||
const resp = await aws.fetch(url, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": contentType || "application/octet-stream",
|
||||
"x-amz-acl": "bucket-owner-full-control",
|
||||
"x-amz-content-sha256": "UNSIGNED-PAYLOAD",
|
||||
},
|
||||
});
|
||||
|
||||
console.log(`[initiateMultipart] Response status: ${resp.status}`);
|
||||
if (!resp.ok) {
|
||||
const text = await resp.text();
|
||||
console.error(`[initiateMultipart] Error body: ${text}`);
|
||||
throw new Error(`Initiate multipart failed (${resp.status}): ${text}`);
|
||||
}
|
||||
const xml = await resp.text();
|
||||
const match = xml.match(/<UploadId>(.+?)<\/UploadId>/);
|
||||
if (!match) throw new Error("Could not parse UploadId from response");
|
||||
if (!match) {
|
||||
console.error(`[initiateMultipart] Could not parse UploadId from XML: ${xml}`);
|
||||
throw new Error("Could not parse UploadId from response");
|
||||
}
|
||||
console.log(`[initiateMultipart] UploadId: ${match[1]}`);
|
||||
return match[1];
|
||||
}
|
||||
|
||||
async function uploadPart(aws, bucket, region, key, uploadId, partNumber, body, length, endpoint) {
|
||||
const url = `${s3Url(bucket, key, region, endpoint)}?partNumber=${partNumber}&uploadId=${encodeURIComponent(uploadId)}`;
|
||||
console.log(`[uploadPart] Part ${partNumber}, size: ${length} bytes, URL: ${url}`);
|
||||
|
||||
const resp = await aws.fetch(url, {
|
||||
method: "PUT",
|
||||
headers: { "Content-Length": String(length) },
|
||||
headers: {
|
||||
"Content-Length": String(length),
|
||||
"x-amz-content-sha256": "UNSIGNED-PAYLOAD",
|
||||
},
|
||||
body,
|
||||
});
|
||||
|
||||
console.log(`[uploadPart] Part ${partNumber} response status: ${resp.status}`);
|
||||
if (!resp.ok) {
|
||||
const text = await resp.text();
|
||||
console.error(`[uploadPart] Part ${partNumber} error body: ${text}`);
|
||||
throw new Error(`Upload part ${partNumber} failed (${resp.status}): ${text}`);
|
||||
}
|
||||
const etag = resp.headers.get("ETag");
|
||||
console.log(`[uploadPart] Part ${partNumber} ETag: ${etag}`);
|
||||
return etag;
|
||||
}
|
||||
|
||||
/**
|
||||
* Upload a single part with retry logic for transient failures.
|
||||
* The body (Blob) can be re-read on each attempt.
|
||||
*/
|
||||
async function uploadPartWithRetry(aws, bucket, region, key, uploadId, partNumber, blob, length, endpoint) {
|
||||
let lastErr;
|
||||
for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) {
|
||||
try {
|
||||
// Each retry needs a fresh body since the previous stream may be consumed
|
||||
const etag = await uploadPart(aws, bucket, region, key, uploadId, partNumber, blob, length, endpoint);
|
||||
return etag;
|
||||
} catch (err) {
|
||||
lastErr = err;
|
||||
if (attempt < MAX_RETRIES && isRetryable(err)) {
|
||||
const delay = RETRY_BASE_DELAY_MS * Math.pow(2, attempt - 1);
|
||||
console.warn(`[uploadPartWithRetry] Part ${partNumber} attempt ${attempt} failed (retryable): ${err.message}. Retrying in ${delay}ms...`);
|
||||
await sleep(delay);
|
||||
} else {
|
||||
console.error(`[uploadPartWithRetry] Part ${partNumber} attempt ${attempt} failed (non-retryable or max retries): ${err.message}`);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
throw lastErr;
|
||||
}
|
||||
|
||||
async function completeMultipart(aws, bucket, region, key, uploadId, parts, endpoint) {
|
||||
const partsXml = parts
|
||||
.map((p) => `<Part><PartNumber>${p.partNumber}</PartNumber><ETag>${p.etag}</ETag></Part>`)
|
||||
.join("");
|
||||
const xmlBody = `<CompleteMultipartUpload>${partsXml}</CompleteMultipartUpload>`;
|
||||
const url = `${s3Url(bucket, key, region, endpoint)}?uploadId=${encodeURIComponent(uploadId)}`;
|
||||
console.log(`[completeMultipart] URL: ${url}, parts: ${parts.length}`);
|
||||
|
||||
const resp = await aws.fetch(url, {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/xml" },
|
||||
body: xmlBody,
|
||||
});
|
||||
|
||||
console.log(`[completeMultipart] Response status: ${resp.status}`);
|
||||
if (!resp.ok) {
|
||||
const text = await resp.text();
|
||||
console.error(`[completeMultipart] Error body: ${text}`);
|
||||
throw new Error(`Complete multipart failed (${resp.status}): ${text}`);
|
||||
}
|
||||
console.log(`[completeMultipart] Multipart upload completed successfully`);
|
||||
return resp;
|
||||
}
|
||||
|
||||
async function abortMultipart(aws, bucket, region, key, uploadId, endpoint) {
|
||||
const url = `${s3Url(bucket, key, region, endpoint)}?uploadId=${encodeURIComponent(uploadId)}`;
|
||||
console.log(`[abortMultipart] Aborting upload ${uploadId}, URL: ${url}`);
|
||||
try {
|
||||
await aws.fetch(url, { method: "DELETE" });
|
||||
} catch {
|
||||
// best-effort cleanup
|
||||
console.log(`[abortMultipart] Abort succeeded`);
|
||||
} catch (abortErr) {
|
||||
console.error(`[abortMultipart] Abort failed (best-effort): ${abortErr.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -157,26 +249,33 @@ async function abortMultipart(aws, bucket, region, key, uploadId, endpoint) {
|
||||
* S3 multipart part. Memory usage stays bounded to ~MIN_PART_SIZE at a time.
|
||||
*/
|
||||
async function multipartStreamUpload(aws, bucket, region, key, stream, contentType, endpoint) {
|
||||
console.log(`[multipartStreamUpload] Starting multipart upload for key: ${key}`);
|
||||
const uploadId = await initiateMultipart(aws, bucket, region, key, contentType, endpoint);
|
||||
const parts = [];
|
||||
let partNumber = 1;
|
||||
let buffer = [];
|
||||
let bufferSize = 0;
|
||||
let totalBytesRead = 0;
|
||||
|
||||
const reader = stream.getReader();
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
if (done) {
|
||||
console.log(`[multipartStreamUpload] Stream finished. Total bytes read: ${totalBytesRead}`);
|
||||
break;
|
||||
}
|
||||
|
||||
buffer.push(value);
|
||||
bufferSize += value.byteLength;
|
||||
totalBytesRead += value.byteLength;
|
||||
|
||||
// Flush when we've accumulated enough for a part
|
||||
if (bufferSize >= MIN_PART_SIZE) {
|
||||
console.log(`[multipartStreamUpload] Flushing part ${partNumber}, buffer size: ${bufferSize}`);
|
||||
const blob = new Blob(buffer);
|
||||
const etag = await uploadPart(aws, bucket, region, key, uploadId, partNumber, blob, bufferSize, endpoint);
|
||||
const etag = await uploadPartWithRetry(aws, bucket, region, key, uploadId, partNumber, blob, bufferSize, endpoint);
|
||||
parts.push({ partNumber, etag });
|
||||
partNumber++;
|
||||
buffer = [];
|
||||
@@ -186,13 +285,17 @@ async function multipartStreamUpload(aws, bucket, region, key, stream, contentTy
|
||||
|
||||
// Upload remaining bytes as the final part
|
||||
if (bufferSize > 0) {
|
||||
console.log(`[multipartStreamUpload] Uploading final part ${partNumber}, size: ${bufferSize}`);
|
||||
const blob = new Blob(buffer);
|
||||
const etag = await uploadPart(aws, bucket, region, key, uploadId, partNumber, blob, bufferSize, endpoint);
|
||||
const etag = await uploadPartWithRetry(aws, bucket, region, key, uploadId, partNumber, blob, bufferSize, endpoint);
|
||||
parts.push({ partNumber, etag });
|
||||
}
|
||||
|
||||
await completeMultipart(aws, bucket, region, key, uploadId, parts, endpoint);
|
||||
console.log(`[multipartStreamUpload] Upload complete. Total parts: ${parts.length}, total bytes: ${totalBytesRead}`);
|
||||
} catch (err) {
|
||||
console.error(`[multipartStreamUpload] Upload failed at part ${partNumber}: ${err.message}`);
|
||||
console.error(`[multipartStreamUpload] Error stack: ${err.stack}`);
|
||||
await abortMultipart(aws, bucket, region, key, uploadId, endpoint);
|
||||
throw err;
|
||||
}
|
||||
@@ -204,8 +307,12 @@ async function multipartStreamUpload(aws, bucket, region, key, stream, contentTy
|
||||
|
||||
export default {
|
||||
async fetch(request, env) {
|
||||
const requestId = crypto.randomUUID();
|
||||
console.log(`[handler] Request ${requestId} received: ${request.method} ${request.url}`);
|
||||
|
||||
// ---- Method check ----
|
||||
if (request.method !== "POST") {
|
||||
console.log(`[handler] ${requestId} rejected: method ${request.method}`);
|
||||
return textResponse("Method Not Allowed. Use POST.", 405);
|
||||
}
|
||||
|
||||
@@ -213,12 +320,14 @@ export default {
|
||||
const authHeader = request.headers.get("Authorization") || "";
|
||||
const token = authHeader.startsWith("Bearer ") ? authHeader.slice(7) : "";
|
||||
if (!token || token !== env.AUTH_TOKEN) {
|
||||
console.log(`[handler] ${requestId} rejected: unauthorized`);
|
||||
return textResponse("Unauthorized", 401);
|
||||
}
|
||||
|
||||
// ---- Content-Type check ----
|
||||
const ct = request.headers.get("Content-Type") || "";
|
||||
if (!ct.includes("application/json")) {
|
||||
console.log(`[handler] ${requestId} rejected: Content-Type "${ct}"`);
|
||||
return textResponse("Content-Type must be application/json", 415);
|
||||
}
|
||||
|
||||
@@ -226,12 +335,16 @@ export default {
|
||||
let payload;
|
||||
try {
|
||||
payload = await request.json();
|
||||
} catch {
|
||||
} catch (parseErr) {
|
||||
console.error(`[handler] ${requestId} JSON parse error: ${parseErr.message}`);
|
||||
return textResponse("Invalid JSON body", 400);
|
||||
}
|
||||
|
||||
const { download_url, user_agent, key_prefix } = payload;
|
||||
console.log(`[handler] ${requestId} payload: download_url=${download_url}, user_agent=${user_agent}, key_prefix=${key_prefix || "(none)"}`);
|
||||
|
||||
if (!download_url || !user_agent) {
|
||||
console.log(`[handler] ${requestId} rejected: missing required fields`);
|
||||
return jsonResponse(
|
||||
{ error: "'download_url' and 'user_agent' are required." },
|
||||
400,
|
||||
@@ -244,10 +357,12 @@ export default {
|
||||
parsedUrl = new URL(download_url);
|
||||
if (!["http:", "https:"].includes(parsedUrl.protocol)) throw new Error();
|
||||
} catch {
|
||||
console.log(`[handler] ${requestId} rejected: invalid download_url`);
|
||||
return jsonResponse({ error: "Invalid download_url" }, 400);
|
||||
}
|
||||
|
||||
// ---- Fetch the source file (streaming) ----
|
||||
console.log(`[handler] ${requestId} fetching source: ${download_url}`);
|
||||
let sourceResp;
|
||||
try {
|
||||
sourceResp = await fetch(download_url, {
|
||||
@@ -255,10 +370,15 @@ export default {
|
||||
redirect: "follow",
|
||||
});
|
||||
} catch (err) {
|
||||
console.error(`[handler] ${requestId} download fetch error: ${err.message}`);
|
||||
console.error(`[handler] ${requestId} download fetch stack: ${err.stack}`);
|
||||
return jsonResponse({ error: `Download failed: ${err.message}` }, 502);
|
||||
}
|
||||
|
||||
console.log(`[handler] ${requestId} source response: status=${sourceResp.status}, Content-Type=${sourceResp.headers.get("Content-Type")}, Content-Length=${sourceResp.headers.get("Content-Length")}`);
|
||||
|
||||
if (!sourceResp.ok) {
|
||||
console.error(`[handler] ${requestId} source returned non-OK: ${sourceResp.status}`);
|
||||
return jsonResponse(
|
||||
{ error: `Source returned HTTP ${sourceResp.status}` },
|
||||
502,
|
||||
@@ -274,12 +394,19 @@ export default {
|
||||
sourceResp.headers.get("Content-Type") || "application/octet-stream";
|
||||
const contentLength = sourceResp.headers.get("Content-Length");
|
||||
|
||||
const numericLength = contentLength ? Number(contentLength) : 0;
|
||||
const useMultipart = !numericLength || numericLength > MAX_SINGLE_PUT_SIZE;
|
||||
|
||||
console.log(`[handler] ${requestId} S3 target: bucket=${bucket}, region=${region}, endpoint=${endpoint || "(default)"}, key=${key}`);
|
||||
console.log(`[handler] ${requestId} upload strategy: ${useMultipart ? `multipart (size: ${numericLength || "unknown"})` : `single PUT (${numericLength} bytes)`}`);
|
||||
|
||||
const aws = makeAwsClient(env);
|
||||
|
||||
// ---- Upload to S3 ----
|
||||
const uploadStart = Date.now();
|
||||
try {
|
||||
if (contentLength && Number(contentLength) > 0) {
|
||||
// Known size → single streaming PUT (zero extra memory)
|
||||
if (!useMultipart) {
|
||||
// Known size ≤ 100 MiB → single streaming PUT (zero extra memory)
|
||||
await putObjectStreaming(
|
||||
aws,
|
||||
bucket,
|
||||
@@ -291,7 +418,7 @@ export default {
|
||||
endpoint,
|
||||
);
|
||||
} else {
|
||||
// Unknown size → multipart streaming upload (≤ 5 MiB buffer)
|
||||
// Unknown size or > 100 MiB → multipart streaming upload (≤ 5 MiB buffer)
|
||||
await multipartStreamUpload(
|
||||
aws,
|
||||
bucket,
|
||||
@@ -303,9 +430,15 @@ export default {
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
const elapsed = ((Date.now() - uploadStart) / 1000).toFixed(1);
|
||||
console.error(`[handler] ${requestId} S3 upload failed after ${elapsed}s: ${err.message}`);
|
||||
console.error(`[handler] ${requestId} S3 upload error stack: ${err.stack}`);
|
||||
return jsonResponse({ error: `S3 upload failed: ${err.message}` }, 502);
|
||||
}
|
||||
|
||||
const elapsed = ((Date.now() - uploadStart) / 1000).toFixed(1);
|
||||
console.log(`[handler] ${requestId} upload completed in ${elapsed}s`);
|
||||
|
||||
return jsonResponse({
|
||||
ok: true,
|
||||
bucket,
|
||||
|
||||
@@ -21,3 +21,6 @@ S3_ENDPOINT = ""
|
||||
[placement]
|
||||
# TODO: parametize so user can specifically set where they want the worker to run
|
||||
mode = "smart"
|
||||
|
||||
[observability]
|
||||
enabled = false
|
||||
|
||||
Reference in New Issue
Block a user