diff --git a/scripts/06_call_http_ingestor/README.md b/scripts/06_call_http_ingestor/README.md index 830112a..88db752 100644 --- a/scripts/06_call_http_ingestor/README.md +++ b/scripts/06_call_http_ingestor/README.md @@ -15,7 +15,7 @@ uv run d4c-http-ingestor \ --auth-token "$D4C_INGESTOR_AUTH_TOKEN" \ --db ca-qc_government_and_municipalities_of_quebec-2026A000224_d4c-datapkg-orthoimagery_orthorectified_imagery_from_quebec.sqlite \ --key-prefix dataforcanada/d4c-datapkg-orthoimagery/archive/ca-qc_government_and_municipalities_of_quebec-2026A000224_d4c-datapkg-orthoimagery_orthorectified_imagery_from_quebec \ - --out parquet/ \ + --out ca-qc_government_and_municipalities_of_quebec-2026A000224_d4c-datapkg-orthoimagery_orthorectified_imagery_from_quebec \ --concurrency 12 ``` @@ -26,7 +26,7 @@ The auth token can also be set via the `D4C_INGESTOR_AUTH_TOKEN` environment var ``` usage: d4c-http-ingestor [-h] --urls URLS --dataset-id DATASET_ID [--worker-url WORKER_URL] [--auth-token AUTH_TOKEN] - --db DB [--key-prefix KEY_PREFIX] [--out OUT] + --db DB [--key-prefix KEY_PREFIX] --out OUT [--concurrency CONCURRENCY] [--timeout TIMEOUT] [--max-retries MAX_RETRIES] [--resume | --no-resume] [--force-refresh] @@ -40,7 +40,7 @@ usage: d4c-http-ingestor [-h] --urls URLS --dataset-id DATASET_ID | `--auth-token` | `$D4C_INGESTOR_AUTH_TOKEN` | Bearer token for the worker | | `--db` | *(required)* | Path to the SQLite database file | | `--key-prefix` | `""` | S3 key prefix passed to the worker | -| `--out` | `parquet/` | Output directory for the Parquet artifact | +| `--out` | *(required)* | Parquet output filename stem (e.g. `my-dataset` → `my-dataset.parquet`) | | `--concurrency` | `12` | Maximum concurrent worker requests | | `--timeout` | `600` | Per-request timeout in seconds | | `--max-retries` | `3` | Maximum retry attempts per URL on failure | @@ -61,9 +61,10 @@ usage: d4c-http-ingestor [-h] --urls URLS --dataset-id DATASET_ID "key_prefix": "" } ``` -6. Persists each result (success/failed) to SQLite with idempotent upsert. +6. Persists each result (success/failed) to SQLite with idempotent upsert, including the ETag, multipart info, and timestamps returned by the worker. 7. Failed URLs are retried with exponential backoff + jitter (up to `--max-retries`). -8. On completion, exports the full `downloads` table to `parquet/downloads.parquet`. +8. **Every 100 successful downloads**, the full `downloads` table is exported to `{out}.parquet` and uploaded to S3 via the worker's PUT endpoint at the `--key-prefix` location. +9. On completion, a final Parquet export + upload is performed. Re-runs append new datasets or update existing rows into the Parquet dataset. @@ -73,13 +74,16 @@ Re-runs append new datasets or update existing rows into the Parquet dataset. ```sql CREATE TABLE IF NOT EXISTS downloads ( - url TEXT PRIMARY KEY, - dataset_id TEXT NOT NULL, - status TEXT NOT NULL, -- success | failed | skipped - http_status INTEGER, - error TEXT, - started_at TEXT NOT NULL, - finished_at TEXT + url TEXT PRIMARY KEY, + dataset_id TEXT NOT NULL, + status TEXT NOT NULL, -- success | failed | skipped + http_status INTEGER, + etag TEXT, + error TEXT, + started_at TEXT NOT NULL, + finished_at TEXT, + multipart_part_size INTEGER, + multipart_number_parts INTEGER ); CREATE INDEX IF NOT EXISTS ix_downloads_dataset ON downloads(dataset_id); CREATE INDEX IF NOT EXISTS ix_downloads_status ON downloads(status); @@ -87,7 +91,20 @@ CREATE INDEX IF NOT EXISTS ix_downloads_status ON downloads(status); ### Parquet columns -Mirrors the SQLite schema exactly. +| Column | Arrow Type | Description | +|--------|-----------|-------------| +| `url` | `string` | Source download URL | +| `dataset_id` | `string` | Logical dataset identifier | +| `status` | `string` | `success`, `failed`, or `skipped` | +| `http_status` | `int32` | HTTP status code from the worker | +| `etag` | `string` | S3 ETag of the uploaded object (quotes stripped) | +| `error` | `string` | Error message (if failed) | +| `started_at` | `timestamp[us, tz=UTC]` | When the worker started processing (from worker response) | +| `finished_at` | `timestamp[us, tz=UTC]` | When the worker finished processing (from worker response) | +| `multipart_part_size` | `int32` | S3 multipart part size in bytes (if multipart was used) | +| `multipart_number_parts` | `int32` | Number of parts uploaded (if multipart was used) | + +> Note: `started_at` and `finished_at` are stored as ISO-8601 text in SQLite but converted to proper Arrow timestamps in the Parquet output. These values come from the Cloudflare worker response, not the Python CLI. ## Dependencies diff --git a/scripts/06_call_http_ingestor/src/d4c_http_ingestor/cli.py b/scripts/06_call_http_ingestor/src/d4c_http_ingestor/cli.py index db1f22a..eaf38ce 100644 --- a/scripts/06_call_http_ingestor/src/d4c_http_ingestor/cli.py +++ b/scripts/06_call_http_ingestor/src/d4c_http_ingestor/cli.py @@ -10,7 +10,6 @@ import argparse import asyncio import os import sys -from datetime import datetime, timezone from pathlib import Path import httpx @@ -26,18 +25,19 @@ from rich.progress import ( from d4c_http_ingestor.db import DownloadRow, DownloadsDB from d4c_http_ingestor.parquet import export_parquet -from d4c_http_ingestor.worker import call_worker_with_retries +from d4c_http_ingestor.worker import call_worker_with_retries, upload_file_to_worker console = Console() # --------------------------------------------------------------------------- -# Helpers +# Constants # --------------------------------------------------------------------------- +EXPORT_EVERY_N = 100 # Export + upload Parquet every N successful downloads -def _utcnow() -> str: - """Return the current UTC time as an ISO-8601 string.""" - return datetime.now(timezone.utc).isoformat() +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- def _read_urls(path: str) -> list[str]: @@ -56,6 +56,50 @@ def _read_urls(path: str) -> list[str]: # --------------------------------------------------------------------------- +async def _export_and_upload( + db: DownloadsDB, + client: httpx.AsyncClient, + *, + out_stem: str, + worker_url: str, + auth_token: str, + key_prefix: str, +) -> None: + """Export the SQLite DB to Parquet and upload it to S3 via the worker.""" + rows = db.all_rows() + if not rows: + return + + parquet_path = export_parquet(rows, out_stem) + parquet_filename = parquet_path.name + s3_key = ( + f"{key_prefix.rstrip('/')}/{parquet_filename}" + if key_prefix + else parquet_filename + ) + + console.print( + f" [cyan]Uploading[/] {parquet_filename} → s3://…/{s3_key}" + ) + result = await upload_file_to_worker( + client, + worker_url=worker_url, + auth_token=auth_token, + file_path=parquet_path, + s3_key=s3_key, + content_type="application/vnd.apache.parquet", + ) + if result.ok: + console.print( + f" [green]✓[/] Parquet uploaded ({len(rows)} rows, " + f"{parquet_path.stat().st_size:,} bytes)" + ) + else: + console.print( + f" [red]✗[/] Parquet upload failed: {result.error}" + ) + + async def _process_urls( urls: list[str], *, @@ -64,18 +108,45 @@ async def _process_urls( worker_url: str, auth_token: str, key_prefix: str, + out_stem: str, concurrency: int, timeout: float, max_retries: int, progress: Progress, task_id: int, ) -> None: - """Submit *urls* to the worker with bounded concurrency.""" - sem = asyncio.Semaphore(concurrency) + """Submit *urls* to the worker with bounded concurrency. + + Uses a fixed-size worker pool so that exactly *concurrency* requests + are in-flight at any time. As soon as one request completes, the + next URL is picked up immediately — no idle slots. + + Every :data:`EXPORT_EVERY_N` successful downloads, the SQLite database + is exported to Parquet and uploaded to S3 via the worker's PUT endpoint. + """ + queue: asyncio.Queue[str | None] = asyncio.Queue() + + # Seed the queue with every URL to process. + for url in urls: + queue.put_nowait(url) + + # Sentinel values – one per worker – so they know when to stop. + for _ in range(concurrency): + queue.put_nowait(None) + + # Shared mutable state protected by a lock. + success_count = 0 + export_lock = asyncio.Lock() + + async def _worker(client: httpx.AsyncClient) -> None: + """Pull URLs from the queue until a ``None`` sentinel is received.""" + nonlocal success_count + + while True: + url = await queue.get() + if url is None: + return - async def _handle(client: httpx.AsyncClient, url: str) -> None: - async with sem: - started = _utcnow() user_agent = f"Data for Canada - {dataset_id}" result = await call_worker_with_retries( @@ -89,27 +160,59 @@ async def _process_urls( max_retries=max_retries, ) - finished = _utcnow() + # Use started_at/finished_at from the worker response row = DownloadRow( url=url, dataset_id=dataset_id, status="success" if result.ok else "failed", http_status=result.http_status, + etag=result.etag, error=result.error, - started_at=started, - finished_at=finished, + started_at=result.started_at or "", + finished_at=result.finished_at, + multipart_part_size=result.multipart_part_size, + multipart_number_parts=result.multipart_number_parts, ) db.upsert(row) progress.advance(task_id) - # Use a single shared httpx client with generous limits + # Periodic Parquet export + upload every N successes + if result.ok: + async with export_lock: + success_count += 1 + if success_count % EXPORT_EVERY_N == 0: + console.print( + f"\n [yellow]Checkpoint[/]: {success_count} " + f"successes — exporting Parquet…" + ) + await _export_and_upload( + db, + client, + out_stem=out_stem, + worker_url=worker_url, + auth_token=auth_token, + key_prefix=key_prefix, + ) + + # Use a single shared httpx client with generous limits. limits = httpx.Limits( max_connections=concurrency + 4, max_keepalive_connections=concurrency, ) async with httpx.AsyncClient(limits=limits, follow_redirects=True) as client: - tasks = [asyncio.create_task(_handle(client, u)) for u in urls] - await asyncio.gather(*tasks) + workers = [asyncio.create_task(_worker(client)) for _ in range(concurrency)] + await asyncio.gather(*workers) + + # Final export + upload after all URLs are processed + console.print("\n [yellow]Final export[/]: exporting Parquet…") + await _export_and_upload( + db, + client, + out_stem=out_stem, + worker_url=worker_url, + auth_token=auth_token, + key_prefix=key_prefix, + ) # --------------------------------------------------------------------------- @@ -154,8 +257,12 @@ def build_parser() -> argparse.ArgumentParser: ) p.add_argument( "--out", - default="parquet/", - help="Output directory for the Parquet artifact (default: parquet/).", + required=True, + help=( + "Parquet output filename stem. For example, " + "'--out my-dataset' creates 'my-dataset.parquet' in the " + "current working directory." + ), ) p.add_argument( "--concurrency", @@ -252,6 +359,7 @@ def main(argv: list[str] | None = None) -> None: worker_url=args.worker_url, auth_token=args.auth_token, key_prefix=args.key_prefix, + out_stem=args.out, concurrency=args.concurrency, timeout=args.timeout, max_retries=args.max_retries, @@ -269,7 +377,7 @@ def main(argv: list[str] | None = None) -> None: ) console.print(f" [{colour}]{status}[/]: {cnt}") - # -- Export Parquet -------------------------------------------------- + # -- Final local Parquet export (no upload — already done above) ----- rows = db.all_rows() if rows: dest = export_parquet(rows, args.out) diff --git a/scripts/06_call_http_ingestor/src/d4c_http_ingestor/db.py b/scripts/06_call_http_ingestor/src/d4c_http_ingestor/db.py index a2bf32f..f6b464c 100644 --- a/scripts/06_call_http_ingestor/src/d4c_http_ingestor/db.py +++ b/scripts/06_call_http_ingestor/src/d4c_http_ingestor/db.py @@ -19,9 +19,12 @@ class DownloadRow: dataset_id: str status: str # success | failed | skipped http_status: int | None = None + etag: str | None = None error: str | None = None started_at: str = "" finished_at: str | None = None + multipart_part_size: int | None = None + multipart_number_parts: int | None = None # --------------------------------------------------------------------------- @@ -30,13 +33,16 @@ class DownloadRow: _SCHEMA_SQL = """\ CREATE TABLE IF NOT EXISTS downloads ( - url TEXT PRIMARY KEY, - dataset_id TEXT NOT NULL, - status TEXT NOT NULL, - http_status INTEGER, - error TEXT, - started_at TEXT NOT NULL, - finished_at TEXT + url TEXT PRIMARY KEY, + dataset_id TEXT NOT NULL, + status TEXT NOT NULL, + http_status INTEGER, + etag TEXT, + error TEXT, + started_at TEXT NOT NULL, + finished_at TEXT, + multipart_part_size INTEGER, + multipart_number_parts INTEGER ); CREATE INDEX IF NOT EXISTS ix_downloads_dataset ON downloads(dataset_id); CREATE INDEX IF NOT EXISTS ix_downloads_status ON downloads(status); @@ -82,21 +88,39 @@ class DownloadsDB: ) return [r["url"] for r in cur.fetchall()] + def count_successful(self) -> int: + """Return the number of rows with status ``success``.""" + cur = self._conn.execute( + "SELECT COUNT(*) AS cnt FROM downloads WHERE status = 'success'" + ) + return cur.fetchone()["cnt"] + # -- mutations ----------------------------------------------------------- def upsert(self, row: DownloadRow) -> None: """Insert or replace a download row.""" self._conn.execute( """\ - INSERT INTO downloads (url, dataset_id, status, http_status, error, started_at, finished_at) - VALUES (:url, :dataset_id, :status, :http_status, :error, :started_at, :finished_at) + INSERT INTO downloads ( + url, dataset_id, status, http_status, etag, error, + started_at, finished_at, + multipart_part_size, multipart_number_parts + ) + VALUES ( + :url, :dataset_id, :status, :http_status, :etag, :error, + :started_at, :finished_at, + :multipart_part_size, :multipart_number_parts + ) ON CONFLICT(url) DO UPDATE SET - dataset_id = excluded.dataset_id, - status = excluded.status, - http_status = excluded.http_status, - error = excluded.error, - started_at = excluded.started_at, - finished_at = excluded.finished_at + dataset_id = excluded.dataset_id, + status = excluded.status, + http_status = excluded.http_status, + etag = excluded.etag, + error = excluded.error, + started_at = excluded.started_at, + finished_at = excluded.finished_at, + multipart_part_size = excluded.multipart_part_size, + multipart_number_parts = excluded.multipart_number_parts """, row.__dict__, ) diff --git a/scripts/06_call_http_ingestor/src/d4c_http_ingestor/parquet.py b/scripts/06_call_http_ingestor/src/d4c_http_ingestor/parquet.py index bcc5134..0039b98 100644 --- a/scripts/06_call_http_ingestor/src/d4c_http_ingestor/parquet.py +++ b/scripts/06_call_http_ingestor/src/d4c_http_ingestor/parquet.py @@ -2,6 +2,7 @@ from __future__ import annotations +from datetime import datetime from pathlib import Path import pyarrow as pa @@ -9,6 +10,18 @@ import pyarrow.parquet as pq from d4c_http_ingestor.db import DownloadRow +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _parse_iso(s: str | None) -> datetime | None: + """Parse an ISO-8601 string into a datetime, or return None.""" + if not s: + return None + return datetime.fromisoformat(s) + + # --------------------------------------------------------------------------- # Schema – mirrors the SQLite ``downloads`` table # --------------------------------------------------------------------------- @@ -19,9 +32,12 @@ _ARROW_SCHEMA = pa.schema( pa.field("dataset_id", pa.string(), nullable=False), pa.field("status", pa.string(), nullable=False), pa.field("http_status", pa.int32(), nullable=True), + pa.field("etag", pa.string(), nullable=True), pa.field("error", pa.string(), nullable=True), - pa.field("started_at", pa.string(), nullable=False), - pa.field("finished_at", pa.string(), nullable=True), + pa.field("started_at", pa.timestamp("us", tz="UTC"), nullable=True), + pa.field("finished_at", pa.timestamp("us", tz="UTC"), nullable=True), + pa.field("multipart_part_size", pa.int32(), nullable=True), + pa.field("multipart_number_parts", pa.int32(), nullable=True), ] ) @@ -33,25 +49,31 @@ def rows_to_table(rows: list[DownloadRow]) -> pa.Table: pa.array([r.dataset_id for r in rows], type=pa.string()), pa.array([r.status for r in rows], type=pa.string()), pa.array([r.http_status for r in rows], type=pa.int32()), + pa.array([r.etag for r in rows], type=pa.string()), pa.array([r.error for r in rows], type=pa.string()), - pa.array([r.started_at for r in rows], type=pa.string()), - pa.array([r.finished_at for r in rows], type=pa.string()), + pa.array( + [_parse_iso(r.started_at) for r in rows], + type=pa.timestamp("us", tz="UTC"), + ), + pa.array( + [_parse_iso(r.finished_at) for r in rows], + type=pa.timestamp("us", tz="UTC"), + ), + pa.array([r.multipart_part_size for r in rows], type=pa.int32()), + pa.array([r.multipart_number_parts for r in rows], type=pa.int32()), ] return pa.table(arrays, schema=_ARROW_SCHEMA) -def export_parquet(rows: list[DownloadRow], out_dir: str | Path) -> Path: - """Write *rows* as a single Parquet file inside *out_dir*. +def export_parquet(rows: list[DownloadRow], out_stem: str) -> Path: + """Write *rows* as a Parquet file named ``{out_stem}.parquet`` in the CWD. - The file is named ``downloads.parquet`` and is overwritten on each run so - that re-runs always reflect the latest state of the SQLite database. + The file is overwritten on each call so that re-runs always reflect the + latest state of the SQLite database. Returns the path to the written file. """ - out_dir = Path(out_dir) - out_dir.mkdir(parents=True, exist_ok=True) - dest = out_dir / "downloads.parquet" - + dest = Path(f"{out_stem}.parquet") table = rows_to_table(rows) pq.write_table(table, dest, compression="zstd") return dest diff --git a/scripts/06_call_http_ingestor/src/d4c_http_ingestor/worker.py b/scripts/06_call_http_ingestor/src/d4c_http_ingestor/worker.py index d5b3d34..a4c22f9 100644 --- a/scripts/06_call_http_ingestor/src/d4c_http_ingestor/worker.py +++ b/scripts/06_call_http_ingestor/src/d4c_http_ingestor/worker.py @@ -5,6 +5,8 @@ from __future__ import annotations import asyncio import random from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path from typing import Any import httpx @@ -21,7 +23,12 @@ class WorkerResult: key: str | None = None content_type: str | None = None size_bytes: int | None = None + etag: str | None = None error: str | None = None + multipart_part_size: int | None = None + multipart_number_parts: int | None = None + started_at: str | None = None + finished_at: str | None = None async def call_worker( @@ -49,6 +56,8 @@ async def call_worker( "Content-Type": "application/json", } + started = datetime.now(timezone.utc).isoformat() + try: resp = await client.post( worker_url, @@ -67,6 +76,11 @@ async def call_worker( key=body.get("key"), content_type=body.get("content_type"), size_bytes=body.get("size_bytes"), + etag=body.get("etag"), + multipart_part_size=body.get("multipart_part_size"), + multipart_number_parts=body.get("multipart_number_parts"), + started_at=body.get("started_at"), + finished_at=body.get("finished_at"), ) else: return WorkerResult( @@ -74,13 +88,15 @@ async def call_worker( ok=False, http_status=resp.status_code, error=body.get("error", resp.text), + started_at=body.get("started_at"), + finished_at=body.get("finished_at"), ) except httpx.TimeoutException as exc: - return WorkerResult(url=download_url, ok=False, error=f"Timeout: {exc}") + return WorkerResult(url=download_url, ok=False, error=f"Timeout: {exc}", started_at=started) except httpx.HTTPError as exc: - return WorkerResult(url=download_url, ok=False, error=f"HTTP error: {exc}") + return WorkerResult(url=download_url, ok=False, error=f"HTTP error: {exc}", started_at=started) except Exception as exc: # noqa: BLE001 - return WorkerResult(url=download_url, ok=False, error=str(exc)) + return WorkerResult(url=download_url, ok=False, error=str(exc), started_at=started) async def call_worker_with_retries( @@ -121,3 +137,71 @@ async def call_worker_with_retries( assert last_result is not None # noqa: S101 return last_result + + +async def upload_file_to_worker( + client: httpx.AsyncClient, + *, + worker_url: str, + auth_token: str, + file_path: Path, + s3_key: str, + content_type: str = "application/octet-stream", + timeout: float = 120.0, +) -> WorkerResult: + """Upload a local file directly to S3 via the worker PUT endpoint. + + Reads the file and sends it as a PUT request body with the ``X-S3-Key`` + header specifying the destination S3 object key. + """ + headers = { + "Authorization": f"Bearer {auth_token}", + "X-S3-Key": s3_key, + "Content-Type": content_type, + } + + try: + file_size = file_path.stat().st_size + headers["Content-Length"] = str(file_size) + + with open(file_path, "rb") as fh: + file_bytes = fh.read() + + resp = await client.put( + worker_url, + content=file_bytes, + headers=headers, + timeout=timeout, + ) + body: dict[str, Any] = resp.json() + + if resp.is_success and body.get("ok"): + return WorkerResult( + url=str(file_path), + ok=True, + http_status=resp.status_code, + bucket=body.get("bucket"), + key=body.get("key"), + content_type=body.get("content_type"), + size_bytes=body.get("size_bytes"), + etag=body.get("etag"), + multipart_part_size=body.get("multipart_part_size"), + multipart_number_parts=body.get("multipart_number_parts"), + started_at=body.get("started_at"), + finished_at=body.get("finished_at"), + ) + else: + return WorkerResult( + url=str(file_path), + ok=False, + http_status=resp.status_code, + error=body.get("error", resp.text), + started_at=body.get("started_at"), + finished_at=body.get("finished_at"), + ) + except httpx.TimeoutException as exc: + return WorkerResult(url=str(file_path), ok=False, error=f"Timeout: {exc}") + except httpx.HTTPError as exc: + return WorkerResult(url=str(file_path), ok=False, error=f"HTTP error: {exc}") + except Exception as exc: # noqa: BLE001 + return WorkerResult(url=str(file_path), ok=False, error=str(exc))