Updates to Cloudflare HTTP ingestor. I tried to calculate different file hashes and failed miserably

This commit is contained in:
Diego Ripley
2026-03-13 07:16:47 -04:00
parent d3b5d69571
commit d4484b665f
3 changed files with 372 additions and 149 deletions
+275 -137
View File
@@ -3,7 +3,7 @@ import { AwsClient } from "aws4fetch";
// ---------------------------------------------------------------------------
// Constants
// ---------------------------------------------------------------------------
const MIN_PART_SIZE = 5 * 1024 * 1024; // 5 MiB S3 minimum for multipart parts
const MIN_PART_SIZE = 25 * 1024 * 1024; // 25 MiB keeps memory bounded on CF Workers (128 MiB limit)
const MAX_SINGLE_PUT_SIZE = 100 * 1024 * 1024; // 100 MiB above this, always use multipart
const MAX_RETRIES = 3;
const RETRY_BASE_DELAY_MS = 1000; // 1 second, doubles each retry
@@ -93,6 +93,12 @@ function isRetryable(err) {
return false;
}
/** Strip surrounding double-quotes from an ETag string (S3 returns them quoted). */
function cleanEtag(etag) {
if (!etag) return null;
return etag.replace(/^"/, "").replace(/"$/, "");
}
// ---------------------------------------------------------------------------
// S3 Upload Single PUT (streaming, requires known Content-Length)
// ---------------------------------------------------------------------------
@@ -122,8 +128,9 @@ async function putObjectStreaming(aws, bucket, region, key, body, contentLength,
console.error(`[putObjectStreaming] S3 PUT error body: ${text}`);
throw new Error(`S3 PUT failed (${resp.status}): ${text}`);
}
console.log(`[putObjectStreaming] PUT succeeded`);
return resp;
const etag = resp.headers.get("ETag");
console.log(`[putObjectStreaming] PUT succeeded, ETag: ${etag}`);
return { resp, etag: cleanEtag(etag) };
}
// ---------------------------------------------------------------------------
@@ -191,7 +198,6 @@ async function uploadPartWithRetry(aws, bucket, region, key, uploadId, partNumbe
let lastErr;
for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) {
try {
// Each retry needs a fresh body since the previous stream may be consumed
const etag = await uploadPart(aws, bucket, region, key, uploadId, partNumber, blob, length, endpoint);
return etag;
} catch (err) {
@@ -229,8 +235,11 @@ async function completeMultipart(aws, bucket, region, key, uploadId, parts, endp
console.error(`[completeMultipart] Error body: ${text}`);
throw new Error(`Complete multipart failed (${resp.status}): ${text}`);
}
console.log(`[completeMultipart] Multipart upload completed successfully`);
return resp;
const xml = await resp.text();
const etagMatch = xml.match(/<ETag>(.+?)<\/ETag>/);
const etag = etagMatch ? etagMatch[1] : resp.headers.get("ETag");
console.log(`[completeMultipart] Multipart upload completed successfully, ETag: ${etag}`);
return { resp, etag: cleanEtag(etag) };
}
async function abortMultipart(aws, bucket, region, key, uploadId, endpoint) {
@@ -244,17 +253,35 @@ async function abortMultipart(aws, bucket, region, key, uploadId, endpoint) {
}
}
/**
* Concatenate an array of Uint8Array chunks into a single Uint8Array.
* This replaces the Blob-based approach to avoid holding both the chunk
* array *and* the Blob in memory simultaneously.
*/
function concatChunks(chunks, totalLength) {
const result = new Uint8Array(totalLength);
let offset = 0;
for (let i = 0; i < chunks.length; i++) {
result.set(chunks[i], offset);
offset += chunks[i].byteLength;
chunks[i] = null; // release reference to each chunk eagerly
}
return result;
}
/**
* Read from a ReadableStream in ≥ MIN_PART_SIZE chunks and upload each as an
* S3 multipart part. Memory usage stays bounded to ~MIN_PART_SIZE at a time.
*
* Returns { etag, totalBytesRead, totalParts }.
*/
async function multipartStreamUpload(aws, bucket, region, key, stream, contentType, endpoint) {
console.log(`[multipartStreamUpload] Starting multipart upload for key: ${key}`);
const uploadId = await initiateMultipart(aws, bucket, region, key, contentType, endpoint);
const parts = [];
let partNumber = 1;
let buffer = [];
let bufferSize = 0;
let chunks = [];
let chunksSize = 0;
let totalBytesRead = 0;
const reader = stream.getReader();
@@ -267,32 +294,35 @@ async function multipartStreamUpload(aws, bucket, region, key, stream, contentTy
break;
}
buffer.push(value);
bufferSize += value.byteLength;
chunks.push(value);
chunksSize += value.byteLength;
totalBytesRead += value.byteLength;
// Flush when we've accumulated enough for a part
if (bufferSize >= MIN_PART_SIZE) {
console.log(`[multipartStreamUpload] Flushing part ${partNumber}, buffer size: ${bufferSize}`);
const blob = new Blob(buffer);
const etag = await uploadPartWithRetry(aws, bucket, region, key, uploadId, partNumber, blob, bufferSize, endpoint);
if (chunksSize >= MIN_PART_SIZE) {
console.log(`[multipartStreamUpload] Flushing part ${partNumber}, buffer size: ${chunksSize}`);
const partBody = concatChunks(chunks, chunksSize);
chunks = []; // release the chunk array immediately
chunksSize = 0;
const etag = await uploadPartWithRetry(aws, bucket, region, key, uploadId, partNumber, partBody, partBody.byteLength, endpoint);
parts.push({ partNumber, etag });
partNumber++;
buffer = [];
bufferSize = 0;
}
}
// Upload remaining bytes as the final part
if (bufferSize > 0) {
console.log(`[multipartStreamUpload] Uploading final part ${partNumber}, size: ${bufferSize}`);
const blob = new Blob(buffer);
const etag = await uploadPartWithRetry(aws, bucket, region, key, uploadId, partNumber, blob, bufferSize, endpoint);
if (chunksSize > 0) {
console.log(`[multipartStreamUpload] Uploading final part ${partNumber}, size: ${chunksSize}`);
const partBody = concatChunks(chunks, chunksSize);
chunks = [];
chunksSize = 0;
const etag = await uploadPartWithRetry(aws, bucket, region, key, uploadId, partNumber, partBody, partBody.byteLength, endpoint);
parts.push({ partNumber, etag });
}
await completeMultipart(aws, bucket, region, key, uploadId, parts, endpoint);
console.log(`[multipartStreamUpload] Upload complete. Total parts: ${parts.length}, total bytes: ${totalBytesRead}`);
const { etag } = await completeMultipart(aws, bucket, region, key, uploadId, parts, endpoint);
console.log(`[multipartStreamUpload] Upload complete. Total parts: ${parts.length}, total bytes: ${totalBytesRead}, ETag: ${etag}`);
return { etag, totalBytesRead, totalParts: parts.length };
} catch (err) {
console.error(`[multipartStreamUpload] Upload failed at part ${partNumber}: ${err.message}`);
console.error(`[multipartStreamUpload] Error stack: ${err.stack}`);
@@ -301,6 +331,45 @@ async function multipartStreamUpload(aws, bucket, region, key, stream, contentTy
}
}
// ---------------------------------------------------------------------------
// Unified upload helper decides single PUT vs multipart
// ---------------------------------------------------------------------------
/**
* Upload a ReadableStream to S3, choosing single PUT or multipart based on
* the known content length.
*
* Returns { etag, useMultipart, totalParts }.
*/
async function doUpload(aws, bucket, region, key, stream, contentType, contentLength, endpoint) {
const numericLength = contentLength ? Number(contentLength) : 0;
const useMultipart = !numericLength || numericLength > MAX_SINGLE_PUT_SIZE;
console.log(`[doUpload] key=${key}, strategy=${useMultipart ? `multipart (size: ${numericLength || "unknown"})` : `single PUT (${numericLength} bytes)`}`);
if (!useMultipart) {
const result = await putObjectStreaming(aws, bucket, region, key, stream, contentLength, contentType, endpoint);
return { etag: result.etag, useMultipart: false, totalParts: 0 };
} else {
const result = await multipartStreamUpload(aws, bucket, region, key, stream, contentType, endpoint);
return { etag: result.etag, useMultipart: true, totalParts: result.totalParts };
}
}
// ---------------------------------------------------------------------------
// Auth helper
// ---------------------------------------------------------------------------
/** Validate the Bearer token. Returns null on success, or an error Response. */
function checkAuth(request, env) {
const authHeader = request.headers.get("Authorization") || "";
const token = authHeader.startsWith("Bearer ") ? authHeader.slice(7) : "";
if (!token || token !== env.AUTH_TOKEN) {
return textResponse("Unauthorized", 401);
}
return null;
}
// ---------------------------------------------------------------------------
// Main handler
// ---------------------------------------------------------------------------
@@ -310,141 +379,210 @@ export default {
const requestId = crypto.randomUUID();
console.log(`[handler] Request ${requestId} received: ${request.method} ${request.url}`);
// ---- Method check ----
if (request.method !== "POST") {
console.log(`[handler] ${requestId} rejected: method ${request.method}`);
return textResponse("Method Not Allowed. Use POST.", 405);
// ---- Route by method ----
if (request.method === "PUT") {
return handleDirectUpload(request, env, requestId);
}
if (request.method === "POST") {
return handleDownloadAndUpload(request, env, requestId);
}
// ---- Auth check ----
const authHeader = request.headers.get("Authorization") || "";
const token = authHeader.startsWith("Bearer ") ? authHeader.slice(7) : "";
if (!token || token !== env.AUTH_TOKEN) {
console.log(`[handler] ${requestId} rejected: unauthorized`);
return textResponse("Unauthorized", 401);
}
console.log(`[handler] ${requestId} rejected: method ${request.method}`);
return textResponse("Method Not Allowed. Use POST or PUT.", 405);
},
};
// ---- Content-Type check ----
const ct = request.headers.get("Content-Type") || "";
if (!ct.includes("application/json")) {
console.log(`[handler] ${requestId} rejected: Content-Type "${ct}"`);
return textResponse("Content-Type must be application/json", 415);
}
// ---------------------------------------------------------------------------
// POST handler download from URL and upload to S3
// ---------------------------------------------------------------------------
// ---- Parse body ----
let payload;
try {
payload = await request.json();
} catch (parseErr) {
console.error(`[handler] ${requestId} JSON parse error: ${parseErr.message}`);
return textResponse("Invalid JSON body", 400);
}
async function handleDownloadAndUpload(request, env, requestId) {
const startedAt = new Date().toISOString();
const { download_url, user_agent, key_prefix } = payload;
console.log(`[handler] ${requestId} payload: download_url=${download_url}, user_agent=${user_agent}, key_prefix=${key_prefix || "(none)"}`);
// ---- Auth check ----
const authErr = checkAuth(request, env);
if (authErr) {
console.log(`[handler] ${requestId} rejected: unauthorized`);
return authErr;
}
if (!download_url || !user_agent) {
console.log(`[handler] ${requestId} rejected: missing required fields`);
return jsonResponse(
{ error: "'download_url' and 'user_agent' are required." },
400,
);
}
// ---- Content-Type check ----
const ct = request.headers.get("Content-Type") || "";
if (!ct.includes("application/json")) {
console.log(`[handler] ${requestId} rejected: Content-Type "${ct}"`);
return textResponse("Content-Type must be application/json", 415);
}
// Validate the download URL
let parsedUrl;
try {
parsedUrl = new URL(download_url);
if (!["http:", "https:"].includes(parsedUrl.protocol)) throw new Error();
} catch {
console.log(`[handler] ${requestId} rejected: invalid download_url`);
return jsonResponse({ error: "Invalid download_url" }, 400);
}
// ---- Parse body ----
let payload;
try {
payload = await request.json();
} catch (parseErr) {
console.error(`[handler] ${requestId} JSON parse error: ${parseErr.message}`);
return textResponse("Invalid JSON body", 400);
}
// ---- Fetch the source file (streaming) ----
console.log(`[handler] ${requestId} fetching source: ${download_url}`);
let sourceResp;
try {
sourceResp = await fetch(download_url, {
headers: { "User-Agent": user_agent },
redirect: "follow",
});
} catch (err) {
console.error(`[handler] ${requestId} download fetch error: ${err.message}`);
console.error(`[handler] ${requestId} download fetch stack: ${err.stack}`);
return jsonResponse({ error: `Download failed: ${err.message}` }, 502);
}
const { download_url, user_agent, key_prefix } = payload;
console.log(`[handler] ${requestId} payload: download_url=${download_url}, user_agent=${user_agent}, key_prefix=${key_prefix || "(none)"}`);
console.log(`[handler] ${requestId} source response: status=${sourceResp.status}, Content-Type=${sourceResp.headers.get("Content-Type")}, Content-Length=${sourceResp.headers.get("Content-Length")}`);
if (!download_url || !user_agent) {
console.log(`[handler] ${requestId} rejected: missing required fields`);
return jsonResponse(
{ error: "'download_url' and 'user_agent' are required." },
400,
);
}
if (!sourceResp.ok) {
console.error(`[handler] ${requestId} source returned non-OK: ${sourceResp.status}`);
return jsonResponse(
{ error: `Source returned HTTP ${sourceResp.status}` },
502,
);
}
// Validate the download URL
let parsedUrl;
try {
parsedUrl = new URL(download_url);
if (!["http:", "https:"].includes(parsedUrl.protocol)) throw new Error();
} catch {
console.log(`[handler] ${requestId} rejected: invalid download_url`);
return jsonResponse({ error: "Invalid download_url" }, 400);
}
// ---- Prepare S3 parameters ----
const bucket = env.S3_BUCKET;
const region = env.S3_REGION || "us-west-2";
const endpoint = env.S3_ENDPOINT || "";
const key = objectKeyFromUrl(download_url, key_prefix || "");
const sourceContentType =
sourceResp.headers.get("Content-Type") || "application/octet-stream";
const contentLength = sourceResp.headers.get("Content-Length");
// ---- Fetch the source file (streaming) ----
console.log(`[handler] ${requestId} fetching source: ${download_url}`);
let sourceResp;
try {
sourceResp = await fetch(download_url, {
headers: { "User-Agent": user_agent },
redirect: "follow",
});
} catch (err) {
console.error(`[handler] ${requestId} download fetch error: ${err.message}`);
console.error(`[handler] ${requestId} download fetch stack: ${err.stack}`);
return jsonResponse({ error: `Download failed: ${err.message}` }, 502);
}
const numericLength = contentLength ? Number(contentLength) : 0;
const useMultipart = !numericLength || numericLength > MAX_SINGLE_PUT_SIZE;
console.log(`[handler] ${requestId} source response: status=${sourceResp.status}, Content-Type=${sourceResp.headers.get("Content-Type")}, Content-Length=${sourceResp.headers.get("Content-Length")}`);
console.log(`[handler] ${requestId} S3 target: bucket=${bucket}, region=${region}, endpoint=${endpoint || "(default)"}, key=${key}`);
console.log(`[handler] ${requestId} upload strategy: ${useMultipart ? `multipart (size: ${numericLength || "unknown"})` : `single PUT (${numericLength} bytes)`}`);
if (!sourceResp.ok) {
console.error(`[handler] ${requestId} source returned non-OK: ${sourceResp.status}`);
return jsonResponse(
{ error: `Source returned HTTP ${sourceResp.status}` },
502,
);
}
const aws = makeAwsClient(env);
// ---- Prepare S3 parameters ----
const bucket = env.S3_BUCKET;
const region = env.S3_REGION || "us-west-2";
const endpoint = env.S3_ENDPOINT || "";
const key = objectKeyFromUrl(download_url, key_prefix || "");
const sourceContentType =
sourceResp.headers.get("Content-Type") || "application/octet-stream";
const contentLength = sourceResp.headers.get("Content-Length");
// ---- Upload to S3 ----
const uploadStart = Date.now();
try {
if (!useMultipart) {
// Known size ≤ 100 MiB → single streaming PUT (zero extra memory)
await putObjectStreaming(
aws,
bucket,
region,
key,
sourceResp.body,
contentLength,
sourceContentType,
endpoint,
);
} else {
// Unknown size or > 100 MiB → multipart streaming upload (≤ 5 MiB buffer)
await multipartStreamUpload(
aws,
bucket,
region,
key,
sourceResp.body,
sourceContentType,
endpoint,
);
}
} catch (err) {
const elapsed = ((Date.now() - uploadStart) / 1000).toFixed(1);
console.error(`[handler] ${requestId} S3 upload failed after ${elapsed}s: ${err.message}`);
console.error(`[handler] ${requestId} S3 upload error stack: ${err.stack}`);
return jsonResponse({ error: `S3 upload failed: ${err.message}` }, 502);
}
console.log(`[handler] ${requestId} S3 target: bucket=${bucket}, region=${region}, endpoint=${endpoint || "(default)"}, key=${key}`);
const elapsed = ((Date.now() - uploadStart) / 1000).toFixed(1);
console.log(`[handler] ${requestId} upload completed in ${elapsed}s`);
const aws = makeAwsClient(env);
// ---- Upload to S3 (stream directly, no tee) ----
try {
const uploadResult = await doUpload(aws, bucket, region, key, sourceResp.body, sourceContentType, contentLength, endpoint);
const finishedAt = new Date().toISOString();
const sizeBytes = contentLength ? Number(contentLength) : null;
console.log(`[handler] ${requestId} upload completed, ETag: ${uploadResult.etag}`);
return jsonResponse({
ok: true,
bucket,
key,
content_type: sourceContentType,
...(contentLength ? { size_bytes: Number(contentLength) } : {}),
...(sizeBytes !== null ? { size_bytes: sizeBytes } : {}),
...(uploadResult.etag ? { etag: uploadResult.etag } : {}),
...(uploadResult.useMultipart ? {
multipart_part_size: MIN_PART_SIZE,
multipart_number_parts: uploadResult.totalParts,
} : {}),
started_at: startedAt,
finished_at: finishedAt,
});
},
};
} catch (err) {
const finishedAt = new Date().toISOString();
console.error(`[handler] ${requestId} S3 upload failed: ${err.message}`);
console.error(`[handler] ${requestId} S3 upload error stack: ${err.stack}`);
return jsonResponse({
error: `S3 upload failed: ${err.message}`,
started_at: startedAt,
finished_at: finishedAt,
}, 502);
}
}
// ---------------------------------------------------------------------------
// PUT handler direct file upload to S3
// ---------------------------------------------------------------------------
async function handleDirectUpload(request, env, requestId) {
const startedAt = new Date().toISOString();
// ---- Auth check ----
const authErr = checkAuth(request, env);
if (authErr) {
console.log(`[handler] ${requestId} rejected: unauthorized`);
return authErr;
}
// ---- Validate required header ----
const s3Key = request.headers.get("X-S3-Key");
if (!s3Key) {
console.log(`[handler] ${requestId} rejected: missing X-S3-Key header`);
return jsonResponse({ error: "'X-S3-Key' header is required." }, 400);
}
const contentType = request.headers.get("Content-Type") || "application/octet-stream";
const contentLength = request.headers.get("Content-Length");
console.log(`[handler] ${requestId} direct upload: key=${s3Key}, Content-Type=${contentType}, Content-Length=${contentLength || "(unknown)"}`);
if (!request.body) {
console.log(`[handler] ${requestId} rejected: no body`);
return jsonResponse({ error: "Request body is required." }, 400);
}
// ---- Prepare S3 parameters ----
const bucket = env.S3_BUCKET;
const region = env.S3_REGION || "us-west-2";
const endpoint = env.S3_ENDPOINT || "";
const aws = makeAwsClient(env);
// ---- Upload to S3 (stream directly, no tee) ----
try {
const uploadResult = await doUpload(aws, bucket, region, s3Key, request.body, contentType, contentLength, endpoint);
const finishedAt = new Date().toISOString();
const sizeBytes = contentLength ? Number(contentLength) : null;
console.log(`[handler] ${requestId} direct upload completed, ETag: ${uploadResult.etag}`);
return jsonResponse({
ok: true,
bucket,
key: s3Key,
content_type: contentType,
...(sizeBytes !== null ? { size_bytes: sizeBytes } : {}),
...(uploadResult.etag ? { etag: uploadResult.etag } : {}),
...(uploadResult.useMultipart ? {
multipart_part_size: MIN_PART_SIZE,
multipart_number_parts: uploadResult.totalParts,
} : {}),
started_at: startedAt,
finished_at: finishedAt,
});
} catch (err) {
const finishedAt = new Date().toISOString();
console.error(`[handler] ${requestId} direct upload failed: ${err.message}`);
console.error(`[handler] ${requestId} direct upload error stack: ${err.stack}`);
return jsonResponse({
error: `S3 upload failed: ${err.message}`,
started_at: startedAt,
finished_at: finishedAt,
}, 502);
}
}