diff --git a/scripts/05_cloudflare_http_ingestor/README.md b/scripts/05_cloudflare_http_ingestor/README.md index f410c46..ab86ccd 100644 --- a/scripts/05_cloudflare_http_ingestor/README.md +++ b/scripts/05_cloudflare_http_ingestor/README.md @@ -16,10 +16,13 @@ Client POST ──▶ Worker ──stream──▶ S3 PutObject / Multipart **Two upload paths are used automatically:** -| Source provides `Content-Length`? | Upload method | Memory overhead | +| Condition | Upload method | Memory overhead | |---|---|---| -| Yes | Single streaming `PUT` | ~0 (pipe-through) | -| No | Multipart upload in 5 MiB chunks | ≤ 5 MiB | +| Known size ≤ 100 MiB | Single streaming `PUT` | ~0 (pipe-through) | +| Unknown size **or** > 100 MiB | Multipart upload in 5 MiB chunks | ≤ 5 MiB | + +> Files larger than 100 MiB always use multipart upload because Cloudflare +> Workers enforce a body-size limit on single outbound `fetch()` requests. ## Setup diff --git a/scripts/05_cloudflare_http_ingestor/src/index.js b/scripts/05_cloudflare_http_ingestor/src/index.js index 633b0cd..50f83fa 100644 --- a/scripts/05_cloudflare_http_ingestor/src/index.js +++ b/scripts/05_cloudflare_http_ingestor/src/index.js @@ -4,6 +4,9 @@ import { AwsClient } from "aws4fetch"; // Constants // --------------------------------------------------------------------------- const MIN_PART_SIZE = 5 * 1024 * 1024; // 5 MiB – S3 minimum for multipart parts +const MAX_SINGLE_PUT_SIZE = 100 * 1024 * 1024; // 100 MiB – above this, always use multipart +const MAX_RETRIES = 3; +const RETRY_BASE_DELAY_MS = 1000; // 1 second, doubles each retry // --------------------------------------------------------------------------- // Helpers @@ -62,28 +65,64 @@ function s3Url(bucket, key, region, endpoint) { return `${base}/${bucket}/${encodeURI(key)}`; } +/** Sleep for the given number of milliseconds. */ +function sleep(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +/** + * Determine if an error is retryable (transient network / server errors). + */ +function isRetryable(err) { + const msg = (err.message || "").toLowerCase(); + // Network-level failures + if ( + msg.includes("network") || + msg.includes("connection") || + msg.includes("timeout") || + msg.includes("socket") || + msg.includes("econnreset") || + msg.includes("fetch failed") + ) { + return true; + } + // S3 server-side errors (5xx) + if (/\b5\d{2}\b/.test(msg)) { + return true; + } + return false; +} + // --------------------------------------------------------------------------- // S3 Upload – Single PUT (streaming, requires known Content-Length) // --------------------------------------------------------------------------- async function putObjectStreaming(aws, bucket, region, key, body, contentLength, contentType, endpoint) { const url = s3Url(bucket, key, region, endpoint); + console.log(`[putObjectStreaming] URL: ${url}`); + console.log(`[putObjectStreaming] Content-Type: ${contentType}, Content-Length: ${contentLength}`); + const headers = { "Content-Type": contentType || "application/octet-stream", "Content-Length": String(contentLength), "x-amz-acl": "bucket-owner-full-control", + "x-amz-content-sha256": "UNSIGNED-PAYLOAD", }; + console.log(`[putObjectStreaming] Sending PUT request...`); const resp = await aws.fetch(url, { method: "PUT", headers, body, // ReadableStream – streamed directly, no buffering }); + console.log(`[putObjectStreaming] Response status: ${resp.status}`); if (!resp.ok) { const text = await resp.text(); + console.error(`[putObjectStreaming] S3 PUT error body: ${text}`); throw new Error(`S3 PUT failed (${resp.status}): ${text}`); } + console.log(`[putObjectStreaming] PUT succeeded`); return resp; } @@ -93,62 +132,115 @@ async function putObjectStreaming(aws, bucket, region, key, body, contentLength, async function initiateMultipart(aws, bucket, region, key, contentType, endpoint) { const url = `${s3Url(bucket, key, region, endpoint)}?uploads`; + console.log(`[initiateMultipart] URL: ${url}`); + const resp = await aws.fetch(url, { method: "POST", headers: { "Content-Type": contentType || "application/octet-stream", "x-amz-acl": "bucket-owner-full-control", + "x-amz-content-sha256": "UNSIGNED-PAYLOAD", }, }); + + console.log(`[initiateMultipart] Response status: ${resp.status}`); if (!resp.ok) { const text = await resp.text(); + console.error(`[initiateMultipart] Error body: ${text}`); throw new Error(`Initiate multipart failed (${resp.status}): ${text}`); } const xml = await resp.text(); const match = xml.match(/(.+?)<\/UploadId>/); - if (!match) throw new Error("Could not parse UploadId from response"); + if (!match) { + console.error(`[initiateMultipart] Could not parse UploadId from XML: ${xml}`); + throw new Error("Could not parse UploadId from response"); + } + console.log(`[initiateMultipart] UploadId: ${match[1]}`); return match[1]; } async function uploadPart(aws, bucket, region, key, uploadId, partNumber, body, length, endpoint) { const url = `${s3Url(bucket, key, region, endpoint)}?partNumber=${partNumber}&uploadId=${encodeURIComponent(uploadId)}`; + console.log(`[uploadPart] Part ${partNumber}, size: ${length} bytes, URL: ${url}`); + const resp = await aws.fetch(url, { method: "PUT", - headers: { "Content-Length": String(length) }, + headers: { + "Content-Length": String(length), + "x-amz-content-sha256": "UNSIGNED-PAYLOAD", + }, body, }); + + console.log(`[uploadPart] Part ${partNumber} response status: ${resp.status}`); if (!resp.ok) { const text = await resp.text(); + console.error(`[uploadPart] Part ${partNumber} error body: ${text}`); throw new Error(`Upload part ${partNumber} failed (${resp.status}): ${text}`); } const etag = resp.headers.get("ETag"); + console.log(`[uploadPart] Part ${partNumber} ETag: ${etag}`); return etag; } +/** + * Upload a single part with retry logic for transient failures. + * The body (Blob) can be re-read on each attempt. + */ +async function uploadPartWithRetry(aws, bucket, region, key, uploadId, partNumber, blob, length, endpoint) { + let lastErr; + for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) { + try { + // Each retry needs a fresh body since the previous stream may be consumed + const etag = await uploadPart(aws, bucket, region, key, uploadId, partNumber, blob, length, endpoint); + return etag; + } catch (err) { + lastErr = err; + if (attempt < MAX_RETRIES && isRetryable(err)) { + const delay = RETRY_BASE_DELAY_MS * Math.pow(2, attempt - 1); + console.warn(`[uploadPartWithRetry] Part ${partNumber} attempt ${attempt} failed (retryable): ${err.message}. Retrying in ${delay}ms...`); + await sleep(delay); + } else { + console.error(`[uploadPartWithRetry] Part ${partNumber} attempt ${attempt} failed (non-retryable or max retries): ${err.message}`); + break; + } + } + } + throw lastErr; +} + async function completeMultipart(aws, bucket, region, key, uploadId, parts, endpoint) { const partsXml = parts .map((p) => `${p.partNumber}${p.etag}`) .join(""); const xmlBody = `${partsXml}`; const url = `${s3Url(bucket, key, region, endpoint)}?uploadId=${encodeURIComponent(uploadId)}`; + console.log(`[completeMultipart] URL: ${url}, parts: ${parts.length}`); + const resp = await aws.fetch(url, { method: "POST", headers: { "Content-Type": "application/xml" }, body: xmlBody, }); + + console.log(`[completeMultipart] Response status: ${resp.status}`); if (!resp.ok) { const text = await resp.text(); + console.error(`[completeMultipart] Error body: ${text}`); throw new Error(`Complete multipart failed (${resp.status}): ${text}`); } + console.log(`[completeMultipart] Multipart upload completed successfully`); return resp; } async function abortMultipart(aws, bucket, region, key, uploadId, endpoint) { const url = `${s3Url(bucket, key, region, endpoint)}?uploadId=${encodeURIComponent(uploadId)}`; + console.log(`[abortMultipart] Aborting upload ${uploadId}, URL: ${url}`); try { await aws.fetch(url, { method: "DELETE" }); - } catch { - // best-effort cleanup + console.log(`[abortMultipart] Abort succeeded`); + } catch (abortErr) { + console.error(`[abortMultipart] Abort failed (best-effort): ${abortErr.message}`); } } @@ -157,26 +249,33 @@ async function abortMultipart(aws, bucket, region, key, uploadId, endpoint) { * S3 multipart part. Memory usage stays bounded to ~MIN_PART_SIZE at a time. */ async function multipartStreamUpload(aws, bucket, region, key, stream, contentType, endpoint) { + console.log(`[multipartStreamUpload] Starting multipart upload for key: ${key}`); const uploadId = await initiateMultipart(aws, bucket, region, key, contentType, endpoint); const parts = []; let partNumber = 1; let buffer = []; let bufferSize = 0; + let totalBytesRead = 0; const reader = stream.getReader(); try { while (true) { const { done, value } = await reader.read(); - if (done) break; + if (done) { + console.log(`[multipartStreamUpload] Stream finished. Total bytes read: ${totalBytesRead}`); + break; + } buffer.push(value); bufferSize += value.byteLength; + totalBytesRead += value.byteLength; // Flush when we've accumulated enough for a part if (bufferSize >= MIN_PART_SIZE) { + console.log(`[multipartStreamUpload] Flushing part ${partNumber}, buffer size: ${bufferSize}`); const blob = new Blob(buffer); - const etag = await uploadPart(aws, bucket, region, key, uploadId, partNumber, blob, bufferSize, endpoint); + const etag = await uploadPartWithRetry(aws, bucket, region, key, uploadId, partNumber, blob, bufferSize, endpoint); parts.push({ partNumber, etag }); partNumber++; buffer = []; @@ -186,13 +285,17 @@ async function multipartStreamUpload(aws, bucket, region, key, stream, contentTy // Upload remaining bytes as the final part if (bufferSize > 0) { + console.log(`[multipartStreamUpload] Uploading final part ${partNumber}, size: ${bufferSize}`); const blob = new Blob(buffer); - const etag = await uploadPart(aws, bucket, region, key, uploadId, partNumber, blob, bufferSize, endpoint); + const etag = await uploadPartWithRetry(aws, bucket, region, key, uploadId, partNumber, blob, bufferSize, endpoint); parts.push({ partNumber, etag }); } await completeMultipart(aws, bucket, region, key, uploadId, parts, endpoint); + console.log(`[multipartStreamUpload] Upload complete. Total parts: ${parts.length}, total bytes: ${totalBytesRead}`); } catch (err) { + console.error(`[multipartStreamUpload] Upload failed at part ${partNumber}: ${err.message}`); + console.error(`[multipartStreamUpload] Error stack: ${err.stack}`); await abortMultipart(aws, bucket, region, key, uploadId, endpoint); throw err; } @@ -204,8 +307,12 @@ async function multipartStreamUpload(aws, bucket, region, key, stream, contentTy export default { async fetch(request, env) { + const requestId = crypto.randomUUID(); + console.log(`[handler] Request ${requestId} received: ${request.method} ${request.url}`); + // ---- Method check ---- if (request.method !== "POST") { + console.log(`[handler] ${requestId} rejected: method ${request.method}`); return textResponse("Method Not Allowed. Use POST.", 405); } @@ -213,12 +320,14 @@ export default { const authHeader = request.headers.get("Authorization") || ""; const token = authHeader.startsWith("Bearer ") ? authHeader.slice(7) : ""; if (!token || token !== env.AUTH_TOKEN) { + console.log(`[handler] ${requestId} rejected: unauthorized`); return textResponse("Unauthorized", 401); } // ---- Content-Type check ---- const ct = request.headers.get("Content-Type") || ""; if (!ct.includes("application/json")) { + console.log(`[handler] ${requestId} rejected: Content-Type "${ct}"`); return textResponse("Content-Type must be application/json", 415); } @@ -226,12 +335,16 @@ export default { let payload; try { payload = await request.json(); - } catch { + } catch (parseErr) { + console.error(`[handler] ${requestId} JSON parse error: ${parseErr.message}`); return textResponse("Invalid JSON body", 400); } const { download_url, user_agent, key_prefix } = payload; + console.log(`[handler] ${requestId} payload: download_url=${download_url}, user_agent=${user_agent}, key_prefix=${key_prefix || "(none)"}`); + if (!download_url || !user_agent) { + console.log(`[handler] ${requestId} rejected: missing required fields`); return jsonResponse( { error: "'download_url' and 'user_agent' are required." }, 400, @@ -244,10 +357,12 @@ export default { parsedUrl = new URL(download_url); if (!["http:", "https:"].includes(parsedUrl.protocol)) throw new Error(); } catch { + console.log(`[handler] ${requestId} rejected: invalid download_url`); return jsonResponse({ error: "Invalid download_url" }, 400); } // ---- Fetch the source file (streaming) ---- + console.log(`[handler] ${requestId} fetching source: ${download_url}`); let sourceResp; try { sourceResp = await fetch(download_url, { @@ -255,10 +370,15 @@ export default { redirect: "follow", }); } catch (err) { + console.error(`[handler] ${requestId} download fetch error: ${err.message}`); + console.error(`[handler] ${requestId} download fetch stack: ${err.stack}`); return jsonResponse({ error: `Download failed: ${err.message}` }, 502); } + console.log(`[handler] ${requestId} source response: status=${sourceResp.status}, Content-Type=${sourceResp.headers.get("Content-Type")}, Content-Length=${sourceResp.headers.get("Content-Length")}`); + if (!sourceResp.ok) { + console.error(`[handler] ${requestId} source returned non-OK: ${sourceResp.status}`); return jsonResponse( { error: `Source returned HTTP ${sourceResp.status}` }, 502, @@ -274,12 +394,19 @@ export default { sourceResp.headers.get("Content-Type") || "application/octet-stream"; const contentLength = sourceResp.headers.get("Content-Length"); + const numericLength = contentLength ? Number(contentLength) : 0; + const useMultipart = !numericLength || numericLength > MAX_SINGLE_PUT_SIZE; + + console.log(`[handler] ${requestId} S3 target: bucket=${bucket}, region=${region}, endpoint=${endpoint || "(default)"}, key=${key}`); + console.log(`[handler] ${requestId} upload strategy: ${useMultipart ? `multipart (size: ${numericLength || "unknown"})` : `single PUT (${numericLength} bytes)`}`); + const aws = makeAwsClient(env); // ---- Upload to S3 ---- + const uploadStart = Date.now(); try { - if (contentLength && Number(contentLength) > 0) { - // Known size → single streaming PUT (zero extra memory) + if (!useMultipart) { + // Known size ≤ 100 MiB → single streaming PUT (zero extra memory) await putObjectStreaming( aws, bucket, @@ -291,7 +418,7 @@ export default { endpoint, ); } else { - // Unknown size → multipart streaming upload (≤ 5 MiB buffer) + // Unknown size or > 100 MiB → multipart streaming upload (≤ 5 MiB buffer) await multipartStreamUpload( aws, bucket, @@ -303,9 +430,15 @@ export default { ); } } catch (err) { + const elapsed = ((Date.now() - uploadStart) / 1000).toFixed(1); + console.error(`[handler] ${requestId} S3 upload failed after ${elapsed}s: ${err.message}`); + console.error(`[handler] ${requestId} S3 upload error stack: ${err.stack}`); return jsonResponse({ error: `S3 upload failed: ${err.message}` }, 502); } + const elapsed = ((Date.now() - uploadStart) / 1000).toFixed(1); + console.log(`[handler] ${requestId} upload completed in ${elapsed}s`); + return jsonResponse({ ok: true, bucket, diff --git a/scripts/05_cloudflare_http_ingestor/wrangler.toml b/scripts/05_cloudflare_http_ingestor/wrangler.toml index 678c084..e613eef 100644 --- a/scripts/05_cloudflare_http_ingestor/wrangler.toml +++ b/scripts/05_cloudflare_http_ingestor/wrangler.toml @@ -21,3 +21,6 @@ S3_ENDPOINT = "" [placement] # TODO: parametize so user can specifically set where they want the worker to run mode = "smart" + +[observability] +enabled = false