mirror of
https://github.com/dataforcanada/d4c-infra-distribution.git
synced 2026-06-13 14:10:53 +02:00
Change multipart upload size to 5 MiB, I was getting out of memory errors for the workers
This commit is contained in:
@@ -2,7 +2,9 @@
|
||||
|
||||
A Cloudflare Worker that acts as a secure proxy: it downloads a file from a
|
||||
URL provided in a JSON payload and streams it directly into an S3 bucket in
|
||||
`us-west-2`, keeping memory usage constant regardless of file size.
|
||||
`us-west-2`, keeping memory usage constant regardless of file size. All uploads
|
||||
use multipart upload with 5 MiB chunks to stay well within the Workers 128 MiB
|
||||
memory limit.
|
||||
|
||||
## Architecture
|
||||
|
||||
@@ -19,15 +21,10 @@ Client PUT ──▶ Worker ──stream──▶ S3 PutObject / Multipart
|
||||
└─ Direct binary upload (X-S3-Key header)
|
||||
```
|
||||
|
||||
**Two upload paths are used automatically:**
|
||||
|
||||
| Condition | Upload method | Memory overhead |
|
||||
|---|---|---|
|
||||
| Known size ≤ 100 MiB | Single streaming `PUT` | ~0 (pipe-through) |
|
||||
| Unknown size **or** > 100 MiB | Multipart upload in 25 MiB chunks | ≤ 25 MiB |
|
||||
|
||||
> Files larger than 100 MiB always use multipart upload because Cloudflare
|
||||
> Workers enforce a body-size limit on single outbound `fetch()` requests.
|
||||
**All uploads use S3 multipart upload with 5 MiB parts**, keeping peak memory
|
||||
bounded to ~5 MiB regardless of file size. This avoids hitting the Cloudflare
|
||||
Workers 128 MiB memory limit that can occur when buffering large single PUT
|
||||
request bodies.
|
||||
|
||||
## Setup
|
||||
|
||||
@@ -122,14 +119,14 @@ curl -X POST https://cf-data-ingestor.labs.dataforcanada.org \
|
||||
"content_type": "application/x-msdownload",
|
||||
"size_bytes": 773722941,
|
||||
"etag": "abc123def456",
|
||||
"multipart_part_size": 26214400,
|
||||
"multipart_number_parts": 30,
|
||||
"multipart_part_size": 5242880,
|
||||
"multipart_number_parts": 148,
|
||||
"started_at": "2026-03-12T21:00:00.000Z",
|
||||
"finished_at": "2026-03-12T21:01:30.000Z"
|
||||
}
|
||||
```
|
||||
|
||||
> `multipart_part_size` and `multipart_number_parts` are only present when multipart upload was used (file > 100 MiB or unknown size).
|
||||
> `multipart_part_size` and `multipart_number_parts` are always present since all uploads use multipart.
|
||||
|
||||
### Direct upload mode (PUT)
|
||||
|
||||
@@ -150,7 +147,7 @@ Uploads a binary file body directly to S3. Useful for uploading local files
|
||||
| Header | Description |
|
||||
|---|---|
|
||||
| `Content-Type` | MIME type (default: `application/octet-stream`) |
|
||||
| `Content-Length` | File size in bytes (enables single PUT for files ≤ 100 MiB) |
|
||||
| `Content-Length` | File size in bytes |
|
||||
|
||||
**Body:** Raw binary file content.
|
||||
|
||||
@@ -190,8 +187,8 @@ curl -X PUT https://cf-data-ingestor.labs.dataforcanada.org \
|
||||
| `content_type` | string | Yes | MIME type of the uploaded file |
|
||||
| `size_bytes` | number | When Content-Length known | File size in bytes |
|
||||
| `etag` | string | When available | S3 ETag (quotes stripped) |
|
||||
| `multipart_part_size` | number | Only for multipart | Part size in bytes (25 MiB) |
|
||||
| `multipart_number_parts` | number | Only for multipart | Number of parts uploaded |
|
||||
| `multipart_part_size` | number | Yes | Part size in bytes (5 MiB) |
|
||||
| `multipart_number_parts` | number | Yes | Number of parts uploaded |
|
||||
| `started_at` | string | Yes | ISO-8601 UTC timestamp when processing started |
|
||||
| `finished_at` | string | Yes | ISO-8601 UTC timestamp when processing finished |
|
||||
|
||||
|
||||
@@ -3,8 +3,7 @@ import { AwsClient } from "aws4fetch";
|
||||
// ---------------------------------------------------------------------------
|
||||
// Constants
|
||||
// ---------------------------------------------------------------------------
|
||||
const MIN_PART_SIZE = 25 * 1024 * 1024; // 25 MiB – keeps memory bounded on CF Workers (128 MiB limit)
|
||||
const MAX_SINGLE_PUT_SIZE = 100 * 1024 * 1024; // 100 MiB – above this, always use multipart
|
||||
const MIN_PART_SIZE = 5 * 1024 * 1024; // 5 MiB – keeps memory well within CF Workers 128 MiB limit
|
||||
const MAX_RETRIES = 3;
|
||||
const RETRY_BASE_DELAY_MS = 1000; // 1 second, doubles each retry
|
||||
|
||||
@@ -99,40 +98,6 @@ function cleanEtag(etag) {
|
||||
return etag.replace(/^"/, "").replace(/"$/, "");
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// S3 Upload – Single PUT (streaming, requires known Content-Length)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async function putObjectStreaming(aws, bucket, region, key, body, contentLength, contentType, endpoint) {
|
||||
const url = s3Url(bucket, key, region, endpoint);
|
||||
console.log(`[putObjectStreaming] URL: ${url}`);
|
||||
console.log(`[putObjectStreaming] Content-Type: ${contentType}, Content-Length: ${contentLength}`);
|
||||
|
||||
const headers = {
|
||||
"Content-Type": contentType || "application/octet-stream",
|
||||
"Content-Length": String(contentLength),
|
||||
"x-amz-acl": "bucket-owner-full-control",
|
||||
"x-amz-content-sha256": "UNSIGNED-PAYLOAD",
|
||||
};
|
||||
|
||||
console.log(`[putObjectStreaming] Sending PUT request...`);
|
||||
const resp = await aws.fetch(url, {
|
||||
method: "PUT",
|
||||
headers,
|
||||
body, // ReadableStream – streamed directly, no buffering
|
||||
});
|
||||
|
||||
console.log(`[putObjectStreaming] Response status: ${resp.status}`);
|
||||
if (!resp.ok) {
|
||||
const text = await resp.text();
|
||||
console.error(`[putObjectStreaming] S3 PUT error body: ${text}`);
|
||||
throw new Error(`S3 PUT failed (${resp.status}): ${text}`);
|
||||
}
|
||||
const etag = resp.headers.get("ETag");
|
||||
console.log(`[putObjectStreaming] PUT succeeded, ETag: ${etag}`);
|
||||
return { resp, etag: cleanEtag(etag) };
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// S3 Upload – Multipart (streaming, for unknown Content-Length)
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -332,28 +297,22 @@ async function multipartStreamUpload(aws, bucket, region, key, stream, contentTy
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Unified upload helper – decides single PUT vs multipart
|
||||
// Unified upload helper – always uses multipart to keep memory bounded
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Upload a ReadableStream to S3, choosing single PUT or multipart based on
|
||||
* the known content length.
|
||||
* Upload a ReadableStream to S3 using multipart upload.
|
||||
* Always uses multipart (even for small files) to keep peak memory at ~5 MiB,
|
||||
* well within the Cloudflare Workers 128 MiB limit.
|
||||
*
|
||||
* Returns { etag, useMultipart, totalParts }.
|
||||
* Returns { etag, totalParts }.
|
||||
*/
|
||||
async function doUpload(aws, bucket, region, key, stream, contentType, contentLength, endpoint) {
|
||||
const numericLength = contentLength ? Number(contentLength) : 0;
|
||||
const useMultipart = !numericLength || numericLength > MAX_SINGLE_PUT_SIZE;
|
||||
console.log(`[doUpload] key=${key}, strategy=multipart (size: ${numericLength || "unknown"})`);
|
||||
|
||||
console.log(`[doUpload] key=${key}, strategy=${useMultipart ? `multipart (size: ${numericLength || "unknown"})` : `single PUT (${numericLength} bytes)`}`);
|
||||
|
||||
if (!useMultipart) {
|
||||
const result = await putObjectStreaming(aws, bucket, region, key, stream, contentLength, contentType, endpoint);
|
||||
return { etag: result.etag, useMultipart: false, totalParts: 0 };
|
||||
} else {
|
||||
const result = await multipartStreamUpload(aws, bucket, region, key, stream, contentType, endpoint);
|
||||
return { etag: result.etag, useMultipart: true, totalParts: result.totalParts };
|
||||
}
|
||||
const result = await multipartStreamUpload(aws, bucket, region, key, stream, contentType, endpoint);
|
||||
return { etag: result.etag, totalParts: result.totalParts };
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -496,10 +455,8 @@ async function handleDownloadAndUpload(request, env, requestId) {
|
||||
content_type: sourceContentType,
|
||||
...(sizeBytes !== null ? { size_bytes: sizeBytes } : {}),
|
||||
...(uploadResult.etag ? { etag: uploadResult.etag } : {}),
|
||||
...(uploadResult.useMultipart ? {
|
||||
multipart_part_size: MIN_PART_SIZE,
|
||||
multipart_number_parts: uploadResult.totalParts,
|
||||
} : {}),
|
||||
multipart_part_size: MIN_PART_SIZE,
|
||||
multipart_number_parts: uploadResult.totalParts,
|
||||
started_at: startedAt,
|
||||
finished_at: finishedAt,
|
||||
});
|
||||
@@ -568,10 +525,8 @@ async function handleDirectUpload(request, env, requestId) {
|
||||
content_type: contentType,
|
||||
...(sizeBytes !== null ? { size_bytes: sizeBytes } : {}),
|
||||
...(uploadResult.etag ? { etag: uploadResult.etag } : {}),
|
||||
...(uploadResult.useMultipart ? {
|
||||
multipart_part_size: MIN_PART_SIZE,
|
||||
multipart_number_parts: uploadResult.totalParts,
|
||||
} : {}),
|
||||
multipart_part_size: MIN_PART_SIZE,
|
||||
multipart_number_parts: uploadResult.totalParts,
|
||||
started_at: startedAt,
|
||||
finished_at: finishedAt,
|
||||
});
|
||||
|
||||
@@ -20,8 +20,8 @@ S3_ENDPOINT = ""
|
||||
|
||||
[placement]
|
||||
# TODO: parametize so user can specifically set where they want the worker to run
|
||||
#mode = "smart"
|
||||
hostname = "diffusion.mern.gouv.qc.ca"
|
||||
mode = "smart"
|
||||
#hostname = "diffusion.mern.gouv.qc.ca"
|
||||
|
||||
[observability]
|
||||
enabled = true
|
||||
|
||||
Reference in New Issue
Block a user