From 9c35374b043aedd5d84e89bd5dbe63dac068fda2 Mon Sep 17 00:00:00 2001 From: Diego Ripley Date: Fri, 13 Mar 2026 09:53:55 -0400 Subject: [PATCH] Change multipart upload size to 5 MiB, I was getting out of memory errors for the workers --- scripts/05_cloudflare_http_ingestor/README.md | 29 ++++---- .../05_cloudflare_http_ingestor/src/index.js | 71 ++++--------------- .../05_cloudflare_http_ingestor/wrangler.toml | 4 +- 3 files changed, 28 insertions(+), 76 deletions(-) diff --git a/scripts/05_cloudflare_http_ingestor/README.md b/scripts/05_cloudflare_http_ingestor/README.md index 88f0ef3..dde6f16 100644 --- a/scripts/05_cloudflare_http_ingestor/README.md +++ b/scripts/05_cloudflare_http_ingestor/README.md @@ -2,7 +2,9 @@ A Cloudflare Worker that acts as a secure proxy: it downloads a file from a URL provided in a JSON payload and streams it directly into an S3 bucket in -`us-west-2`, keeping memory usage constant regardless of file size. +`us-west-2`, keeping memory usage constant regardless of file size. All uploads +use multipart upload with 5 MiB chunks to stay well within the Workers 128 MiB +memory limit. ## Architecture @@ -19,15 +21,10 @@ Client PUT ──▶ Worker ──stream──▶ S3 PutObject / Multipart └─ Direct binary upload (X-S3-Key header) ``` -**Two upload paths are used automatically:** - -| Condition | Upload method | Memory overhead | -|---|---|---| -| Known size ≤ 100 MiB | Single streaming `PUT` | ~0 (pipe-through) | -| Unknown size **or** > 100 MiB | Multipart upload in 25 MiB chunks | ≤ 25 MiB | - -> Files larger than 100 MiB always use multipart upload because Cloudflare -> Workers enforce a body-size limit on single outbound `fetch()` requests. +**All uploads use S3 multipart upload with 5 MiB parts**, keeping peak memory +bounded to ~5 MiB regardless of file size. This avoids hitting the Cloudflare +Workers 128 MiB memory limit that can occur when buffering large single PUT +request bodies. ## Setup @@ -122,14 +119,14 @@ curl -X POST https://cf-data-ingestor.labs.dataforcanada.org \ "content_type": "application/x-msdownload", "size_bytes": 773722941, "etag": "abc123def456", - "multipart_part_size": 26214400, - "multipart_number_parts": 30, + "multipart_part_size": 5242880, + "multipart_number_parts": 148, "started_at": "2026-03-12T21:00:00.000Z", "finished_at": "2026-03-12T21:01:30.000Z" } ``` -> `multipart_part_size` and `multipart_number_parts` are only present when multipart upload was used (file > 100 MiB or unknown size). +> `multipart_part_size` and `multipart_number_parts` are always present since all uploads use multipart. ### Direct upload mode (PUT) @@ -150,7 +147,7 @@ Uploads a binary file body directly to S3. Useful for uploading local files | Header | Description | |---|---| | `Content-Type` | MIME type (default: `application/octet-stream`) | -| `Content-Length` | File size in bytes (enables single PUT for files ≤ 100 MiB) | +| `Content-Length` | File size in bytes | **Body:** Raw binary file content. @@ -190,8 +187,8 @@ curl -X PUT https://cf-data-ingestor.labs.dataforcanada.org \ | `content_type` | string | Yes | MIME type of the uploaded file | | `size_bytes` | number | When Content-Length known | File size in bytes | | `etag` | string | When available | S3 ETag (quotes stripped) | -| `multipart_part_size` | number | Only for multipart | Part size in bytes (25 MiB) | -| `multipart_number_parts` | number | Only for multipart | Number of parts uploaded | +| `multipart_part_size` | number | Yes | Part size in bytes (5 MiB) | +| `multipart_number_parts` | number | Yes | Number of parts uploaded | | `started_at` | string | Yes | ISO-8601 UTC timestamp when processing started | | `finished_at` | string | Yes | ISO-8601 UTC timestamp when processing finished | diff --git a/scripts/05_cloudflare_http_ingestor/src/index.js b/scripts/05_cloudflare_http_ingestor/src/index.js index 368c82b..7de0b3d 100644 --- a/scripts/05_cloudflare_http_ingestor/src/index.js +++ b/scripts/05_cloudflare_http_ingestor/src/index.js @@ -3,8 +3,7 @@ import { AwsClient } from "aws4fetch"; // --------------------------------------------------------------------------- // Constants // --------------------------------------------------------------------------- -const MIN_PART_SIZE = 25 * 1024 * 1024; // 25 MiB – keeps memory bounded on CF Workers (128 MiB limit) -const MAX_SINGLE_PUT_SIZE = 100 * 1024 * 1024; // 100 MiB – above this, always use multipart +const MIN_PART_SIZE = 5 * 1024 * 1024; // 5 MiB – keeps memory well within CF Workers 128 MiB limit const MAX_RETRIES = 3; const RETRY_BASE_DELAY_MS = 1000; // 1 second, doubles each retry @@ -99,40 +98,6 @@ function cleanEtag(etag) { return etag.replace(/^"/, "").replace(/"$/, ""); } -// --------------------------------------------------------------------------- -// S3 Upload – Single PUT (streaming, requires known Content-Length) -// --------------------------------------------------------------------------- - -async function putObjectStreaming(aws, bucket, region, key, body, contentLength, contentType, endpoint) { - const url = s3Url(bucket, key, region, endpoint); - console.log(`[putObjectStreaming] URL: ${url}`); - console.log(`[putObjectStreaming] Content-Type: ${contentType}, Content-Length: ${contentLength}`); - - const headers = { - "Content-Type": contentType || "application/octet-stream", - "Content-Length": String(contentLength), - "x-amz-acl": "bucket-owner-full-control", - "x-amz-content-sha256": "UNSIGNED-PAYLOAD", - }; - - console.log(`[putObjectStreaming] Sending PUT request...`); - const resp = await aws.fetch(url, { - method: "PUT", - headers, - body, // ReadableStream – streamed directly, no buffering - }); - - console.log(`[putObjectStreaming] Response status: ${resp.status}`); - if (!resp.ok) { - const text = await resp.text(); - console.error(`[putObjectStreaming] S3 PUT error body: ${text}`); - throw new Error(`S3 PUT failed (${resp.status}): ${text}`); - } - const etag = resp.headers.get("ETag"); - console.log(`[putObjectStreaming] PUT succeeded, ETag: ${etag}`); - return { resp, etag: cleanEtag(etag) }; -} - // --------------------------------------------------------------------------- // S3 Upload – Multipart (streaming, for unknown Content-Length) // --------------------------------------------------------------------------- @@ -332,28 +297,22 @@ async function multipartStreamUpload(aws, bucket, region, key, stream, contentTy } // --------------------------------------------------------------------------- -// Unified upload helper – decides single PUT vs multipart +// Unified upload helper – always uses multipart to keep memory bounded // --------------------------------------------------------------------------- /** - * Upload a ReadableStream to S3, choosing single PUT or multipart based on - * the known content length. + * Upload a ReadableStream to S3 using multipart upload. + * Always uses multipart (even for small files) to keep peak memory at ~5 MiB, + * well within the Cloudflare Workers 128 MiB limit. * - * Returns { etag, useMultipart, totalParts }. + * Returns { etag, totalParts }. */ async function doUpload(aws, bucket, region, key, stream, contentType, contentLength, endpoint) { const numericLength = contentLength ? Number(contentLength) : 0; - const useMultipart = !numericLength || numericLength > MAX_SINGLE_PUT_SIZE; + console.log(`[doUpload] key=${key}, strategy=multipart (size: ${numericLength || "unknown"})`); - console.log(`[doUpload] key=${key}, strategy=${useMultipart ? `multipart (size: ${numericLength || "unknown"})` : `single PUT (${numericLength} bytes)`}`); - - if (!useMultipart) { - const result = await putObjectStreaming(aws, bucket, region, key, stream, contentLength, contentType, endpoint); - return { etag: result.etag, useMultipart: false, totalParts: 0 }; - } else { - const result = await multipartStreamUpload(aws, bucket, region, key, stream, contentType, endpoint); - return { etag: result.etag, useMultipart: true, totalParts: result.totalParts }; - } + const result = await multipartStreamUpload(aws, bucket, region, key, stream, contentType, endpoint); + return { etag: result.etag, totalParts: result.totalParts }; } // --------------------------------------------------------------------------- @@ -496,10 +455,8 @@ async function handleDownloadAndUpload(request, env, requestId) { content_type: sourceContentType, ...(sizeBytes !== null ? { size_bytes: sizeBytes } : {}), ...(uploadResult.etag ? { etag: uploadResult.etag } : {}), - ...(uploadResult.useMultipart ? { - multipart_part_size: MIN_PART_SIZE, - multipart_number_parts: uploadResult.totalParts, - } : {}), + multipart_part_size: MIN_PART_SIZE, + multipart_number_parts: uploadResult.totalParts, started_at: startedAt, finished_at: finishedAt, }); @@ -568,10 +525,8 @@ async function handleDirectUpload(request, env, requestId) { content_type: contentType, ...(sizeBytes !== null ? { size_bytes: sizeBytes } : {}), ...(uploadResult.etag ? { etag: uploadResult.etag } : {}), - ...(uploadResult.useMultipart ? { - multipart_part_size: MIN_PART_SIZE, - multipart_number_parts: uploadResult.totalParts, - } : {}), + multipart_part_size: MIN_PART_SIZE, + multipart_number_parts: uploadResult.totalParts, started_at: startedAt, finished_at: finishedAt, }); diff --git a/scripts/05_cloudflare_http_ingestor/wrangler.toml b/scripts/05_cloudflare_http_ingestor/wrangler.toml index 91c48f2..e786bc8 100644 --- a/scripts/05_cloudflare_http_ingestor/wrangler.toml +++ b/scripts/05_cloudflare_http_ingestor/wrangler.toml @@ -20,8 +20,8 @@ S3_ENDPOINT = "" [placement] # TODO: parametize so user can specifically set where they want the worker to run -#mode = "smart" -hostname = "diffusion.mern.gouv.qc.ca" +mode = "smart" +#hostname = "diffusion.mern.gouv.qc.ca" [observability] enabled = true