Update Python CLI to call Cloudflare HTTP ingestor. TODO: For singlepart upload, return back part size and number of parts. More descriptive error logged

This commit is contained in:
Diego Ripley
2026-03-13 07:18:36 -04:00
parent d4484b665f
commit 422cff5273
5 changed files with 318 additions and 63 deletions
+30 -13
View File
@@ -15,7 +15,7 @@ uv run d4c-http-ingestor \
--auth-token "$D4C_INGESTOR_AUTH_TOKEN" \
--db ca-qc_government_and_municipalities_of_quebec-2026A000224_d4c-datapkg-orthoimagery_orthorectified_imagery_from_quebec.sqlite \
--key-prefix dataforcanada/d4c-datapkg-orthoimagery/archive/ca-qc_government_and_municipalities_of_quebec-2026A000224_d4c-datapkg-orthoimagery_orthorectified_imagery_from_quebec \
--out parquet/ \
--out ca-qc_government_and_municipalities_of_quebec-2026A000224_d4c-datapkg-orthoimagery_orthorectified_imagery_from_quebec \
--concurrency 12
```
@@ -26,7 +26,7 @@ The auth token can also be set via the `D4C_INGESTOR_AUTH_TOKEN` environment var
```
usage: d4c-http-ingestor [-h] --urls URLS --dataset-id DATASET_ID
[--worker-url WORKER_URL] [--auth-token AUTH_TOKEN]
--db DB [--key-prefix KEY_PREFIX] [--out OUT]
--db DB [--key-prefix KEY_PREFIX] --out OUT
[--concurrency CONCURRENCY] [--timeout TIMEOUT]
[--max-retries MAX_RETRIES]
[--resume | --no-resume] [--force-refresh]
@@ -40,7 +40,7 @@ usage: d4c-http-ingestor [-h] --urls URLS --dataset-id DATASET_ID
| `--auth-token` | `$D4C_INGESTOR_AUTH_TOKEN` | Bearer token for the worker |
| `--db` | *(required)* | Path to the SQLite database file |
| `--key-prefix` | `""` | S3 key prefix passed to the worker |
| `--out` | `parquet/` | Output directory for the Parquet artifact |
| `--out` | *(required)* | Parquet output filename stem (e.g. `my-dataset``my-dataset.parquet`) |
| `--concurrency` | `12` | Maximum concurrent worker requests |
| `--timeout` | `600` | Per-request timeout in seconds |
| `--max-retries` | `3` | Maximum retry attempts per URL on failure |
@@ -61,9 +61,10 @@ usage: d4c-http-ingestor [-h] --urls URLS --dataset-id DATASET_ID
"key_prefix": "<key-prefix>"
}
```
6. Persists each result (success/failed) to SQLite with idempotent upsert.
6. Persists each result (success/failed) to SQLite with idempotent upsert, including the ETag, multipart info, and timestamps returned by the worker.
7. Failed URLs are retried with exponential backoff + jitter (up to `--max-retries`).
8. On completion, exports the full `downloads` table to `parquet/downloads.parquet`.
8. **Every 100 successful downloads**, the full `downloads` table is exported to `{out}.parquet` and uploaded to S3 via the worker's PUT endpoint at the `--key-prefix` location.
9. On completion, a final Parquet export + upload is performed.
Re-runs append new datasets or update existing rows into the Parquet dataset.
@@ -73,13 +74,16 @@ Re-runs append new datasets or update existing rows into the Parquet dataset.
```sql
CREATE TABLE IF NOT EXISTS downloads (
url TEXT PRIMARY KEY,
dataset_id TEXT NOT NULL,
status TEXT NOT NULL, -- success | failed | skipped
http_status INTEGER,
error TEXT,
started_at TEXT NOT NULL,
finished_at TEXT
url TEXT PRIMARY KEY,
dataset_id TEXT NOT NULL,
status TEXT NOT NULL, -- success | failed | skipped
http_status INTEGER,
etag TEXT,
error TEXT,
started_at TEXT NOT NULL,
finished_at TEXT,
multipart_part_size INTEGER,
multipart_number_parts INTEGER
);
CREATE INDEX IF NOT EXISTS ix_downloads_dataset ON downloads(dataset_id);
CREATE INDEX IF NOT EXISTS ix_downloads_status ON downloads(status);
@@ -87,7 +91,20 @@ CREATE INDEX IF NOT EXISTS ix_downloads_status ON downloads(status);
### Parquet columns
Mirrors the SQLite schema exactly.
| Column | Arrow Type | Description |
|--------|-----------|-------------|
| `url` | `string` | Source download URL |
| `dataset_id` | `string` | Logical dataset identifier |
| `status` | `string` | `success`, `failed`, or `skipped` |
| `http_status` | `int32` | HTTP status code from the worker |
| `etag` | `string` | S3 ETag of the uploaded object (quotes stripped) |
| `error` | `string` | Error message (if failed) |
| `started_at` | `timestamp[us, tz=UTC]` | When the worker started processing (from worker response) |
| `finished_at` | `timestamp[us, tz=UTC]` | When the worker finished processing (from worker response) |
| `multipart_part_size` | `int32` | S3 multipart part size in bytes (if multipart was used) |
| `multipart_number_parts` | `int32` | Number of parts uploaded (if multipart was used) |
> Note: `started_at` and `finished_at` are stored as ISO-8601 text in SQLite but converted to proper Arrow timestamps in the Parquet output. These values come from the Cloudflare worker response, not the Python CLI.
## Dependencies
@@ -10,7 +10,6 @@ import argparse
import asyncio
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
import httpx
@@ -26,18 +25,19 @@ from rich.progress import (
from d4c_http_ingestor.db import DownloadRow, DownloadsDB
from d4c_http_ingestor.parquet import export_parquet
from d4c_http_ingestor.worker import call_worker_with_retries
from d4c_http_ingestor.worker import call_worker_with_retries, upload_file_to_worker
console = Console()
# ---------------------------------------------------------------------------
# Helpers
# Constants
# ---------------------------------------------------------------------------
EXPORT_EVERY_N = 100 # Export + upload Parquet every N successful downloads
def _utcnow() -> str:
"""Return the current UTC time as an ISO-8601 string."""
return datetime.now(timezone.utc).isoformat()
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _read_urls(path: str) -> list[str]:
@@ -56,6 +56,50 @@ def _read_urls(path: str) -> list[str]:
# ---------------------------------------------------------------------------
async def _export_and_upload(
db: DownloadsDB,
client: httpx.AsyncClient,
*,
out_stem: str,
worker_url: str,
auth_token: str,
key_prefix: str,
) -> None:
"""Export the SQLite DB to Parquet and upload it to S3 via the worker."""
rows = db.all_rows()
if not rows:
return
parquet_path = export_parquet(rows, out_stem)
parquet_filename = parquet_path.name
s3_key = (
f"{key_prefix.rstrip('/')}/{parquet_filename}"
if key_prefix
else parquet_filename
)
console.print(
f" [cyan]Uploading[/] {parquet_filename} → s3://…/{s3_key}"
)
result = await upload_file_to_worker(
client,
worker_url=worker_url,
auth_token=auth_token,
file_path=parquet_path,
s3_key=s3_key,
content_type="application/vnd.apache.parquet",
)
if result.ok:
console.print(
f" [green]✓[/] Parquet uploaded ({len(rows)} rows, "
f"{parquet_path.stat().st_size:,} bytes)"
)
else:
console.print(
f" [red]✗[/] Parquet upload failed: {result.error}"
)
async def _process_urls(
urls: list[str],
*,
@@ -64,18 +108,45 @@ async def _process_urls(
worker_url: str,
auth_token: str,
key_prefix: str,
out_stem: str,
concurrency: int,
timeout: float,
max_retries: int,
progress: Progress,
task_id: int,
) -> None:
"""Submit *urls* to the worker with bounded concurrency."""
sem = asyncio.Semaphore(concurrency)
"""Submit *urls* to the worker with bounded concurrency.
Uses a fixed-size worker pool so that exactly *concurrency* requests
are in-flight at any time. As soon as one request completes, the
next URL is picked up immediately — no idle slots.
Every :data:`EXPORT_EVERY_N` successful downloads, the SQLite database
is exported to Parquet and uploaded to S3 via the worker's PUT endpoint.
"""
queue: asyncio.Queue[str | None] = asyncio.Queue()
# Seed the queue with every URL to process.
for url in urls:
queue.put_nowait(url)
# Sentinel values one per worker so they know when to stop.
for _ in range(concurrency):
queue.put_nowait(None)
# Shared mutable state protected by a lock.
success_count = 0
export_lock = asyncio.Lock()
async def _worker(client: httpx.AsyncClient) -> None:
"""Pull URLs from the queue until a ``None`` sentinel is received."""
nonlocal success_count
while True:
url = await queue.get()
if url is None:
return
async def _handle(client: httpx.AsyncClient, url: str) -> None:
async with sem:
started = _utcnow()
user_agent = f"Data for Canada - {dataset_id}"
result = await call_worker_with_retries(
@@ -89,27 +160,59 @@ async def _process_urls(
max_retries=max_retries,
)
finished = _utcnow()
# Use started_at/finished_at from the worker response
row = DownloadRow(
url=url,
dataset_id=dataset_id,
status="success" if result.ok else "failed",
http_status=result.http_status,
etag=result.etag,
error=result.error,
started_at=started,
finished_at=finished,
started_at=result.started_at or "",
finished_at=result.finished_at,
multipart_part_size=result.multipart_part_size,
multipart_number_parts=result.multipart_number_parts,
)
db.upsert(row)
progress.advance(task_id)
# Use a single shared httpx client with generous limits
# Periodic Parquet export + upload every N successes
if result.ok:
async with export_lock:
success_count += 1
if success_count % EXPORT_EVERY_N == 0:
console.print(
f"\n [yellow]Checkpoint[/]: {success_count} "
f"successes — exporting Parquet…"
)
await _export_and_upload(
db,
client,
out_stem=out_stem,
worker_url=worker_url,
auth_token=auth_token,
key_prefix=key_prefix,
)
# Use a single shared httpx client with generous limits.
limits = httpx.Limits(
max_connections=concurrency + 4,
max_keepalive_connections=concurrency,
)
async with httpx.AsyncClient(limits=limits, follow_redirects=True) as client:
tasks = [asyncio.create_task(_handle(client, u)) for u in urls]
await asyncio.gather(*tasks)
workers = [asyncio.create_task(_worker(client)) for _ in range(concurrency)]
await asyncio.gather(*workers)
# Final export + upload after all URLs are processed
console.print("\n [yellow]Final export[/]: exporting Parquet…")
await _export_and_upload(
db,
client,
out_stem=out_stem,
worker_url=worker_url,
auth_token=auth_token,
key_prefix=key_prefix,
)
# ---------------------------------------------------------------------------
@@ -154,8 +257,12 @@ def build_parser() -> argparse.ArgumentParser:
)
p.add_argument(
"--out",
default="parquet/",
help="Output directory for the Parquet artifact (default: parquet/).",
required=True,
help=(
"Parquet output filename stem. For example, "
"'--out my-dataset' creates 'my-dataset.parquet' in the "
"current working directory."
),
)
p.add_argument(
"--concurrency",
@@ -252,6 +359,7 @@ def main(argv: list[str] | None = None) -> None:
worker_url=args.worker_url,
auth_token=args.auth_token,
key_prefix=args.key_prefix,
out_stem=args.out,
concurrency=args.concurrency,
timeout=args.timeout,
max_retries=args.max_retries,
@@ -269,7 +377,7 @@ def main(argv: list[str] | None = None) -> None:
)
console.print(f" [{colour}]{status}[/]: {cnt}")
# -- Export Parquet --------------------------------------------------
# -- Final local Parquet export (no upload — already done above) -----
rows = db.all_rows()
if rows:
dest = export_parquet(rows, args.out)
@@ -19,9 +19,12 @@ class DownloadRow:
dataset_id: str
status: str # success | failed | skipped
http_status: int | None = None
etag: str | None = None
error: str | None = None
started_at: str = ""
finished_at: str | None = None
multipart_part_size: int | None = None
multipart_number_parts: int | None = None
# ---------------------------------------------------------------------------
@@ -30,13 +33,16 @@ class DownloadRow:
_SCHEMA_SQL = """\
CREATE TABLE IF NOT EXISTS downloads (
url TEXT PRIMARY KEY,
dataset_id TEXT NOT NULL,
status TEXT NOT NULL,
http_status INTEGER,
error TEXT,
started_at TEXT NOT NULL,
finished_at TEXT
url TEXT PRIMARY KEY,
dataset_id TEXT NOT NULL,
status TEXT NOT NULL,
http_status INTEGER,
etag TEXT,
error TEXT,
started_at TEXT NOT NULL,
finished_at TEXT,
multipart_part_size INTEGER,
multipart_number_parts INTEGER
);
CREATE INDEX IF NOT EXISTS ix_downloads_dataset ON downloads(dataset_id);
CREATE INDEX IF NOT EXISTS ix_downloads_status ON downloads(status);
@@ -82,21 +88,39 @@ class DownloadsDB:
)
return [r["url"] for r in cur.fetchall()]
def count_successful(self) -> int:
"""Return the number of rows with status ``success``."""
cur = self._conn.execute(
"SELECT COUNT(*) AS cnt FROM downloads WHERE status = 'success'"
)
return cur.fetchone()["cnt"]
# -- mutations -----------------------------------------------------------
def upsert(self, row: DownloadRow) -> None:
"""Insert or replace a download row."""
self._conn.execute(
"""\
INSERT INTO downloads (url, dataset_id, status, http_status, error, started_at, finished_at)
VALUES (:url, :dataset_id, :status, :http_status, :error, :started_at, :finished_at)
INSERT INTO downloads (
url, dataset_id, status, http_status, etag, error,
started_at, finished_at,
multipart_part_size, multipart_number_parts
)
VALUES (
:url, :dataset_id, :status, :http_status, :etag, :error,
:started_at, :finished_at,
:multipart_part_size, :multipart_number_parts
)
ON CONFLICT(url) DO UPDATE SET
dataset_id = excluded.dataset_id,
status = excluded.status,
http_status = excluded.http_status,
error = excluded.error,
started_at = excluded.started_at,
finished_at = excluded.finished_at
dataset_id = excluded.dataset_id,
status = excluded.status,
http_status = excluded.http_status,
etag = excluded.etag,
error = excluded.error,
started_at = excluded.started_at,
finished_at = excluded.finished_at,
multipart_part_size = excluded.multipart_part_size,
multipart_number_parts = excluded.multipart_number_parts
""",
row.__dict__,
)
@@ -2,6 +2,7 @@
from __future__ import annotations
from datetime import datetime
from pathlib import Path
import pyarrow as pa
@@ -9,6 +10,18 @@ import pyarrow.parquet as pq
from d4c_http_ingestor.db import DownloadRow
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _parse_iso(s: str | None) -> datetime | None:
"""Parse an ISO-8601 string into a datetime, or return None."""
if not s:
return None
return datetime.fromisoformat(s)
# ---------------------------------------------------------------------------
# Schema mirrors the SQLite ``downloads`` table
# ---------------------------------------------------------------------------
@@ -19,9 +32,12 @@ _ARROW_SCHEMA = pa.schema(
pa.field("dataset_id", pa.string(), nullable=False),
pa.field("status", pa.string(), nullable=False),
pa.field("http_status", pa.int32(), nullable=True),
pa.field("etag", pa.string(), nullable=True),
pa.field("error", pa.string(), nullable=True),
pa.field("started_at", pa.string(), nullable=False),
pa.field("finished_at", pa.string(), nullable=True),
pa.field("started_at", pa.timestamp("us", tz="UTC"), nullable=True),
pa.field("finished_at", pa.timestamp("us", tz="UTC"), nullable=True),
pa.field("multipart_part_size", pa.int32(), nullable=True),
pa.field("multipart_number_parts", pa.int32(), nullable=True),
]
)
@@ -33,25 +49,31 @@ def rows_to_table(rows: list[DownloadRow]) -> pa.Table:
pa.array([r.dataset_id for r in rows], type=pa.string()),
pa.array([r.status for r in rows], type=pa.string()),
pa.array([r.http_status for r in rows], type=pa.int32()),
pa.array([r.etag for r in rows], type=pa.string()),
pa.array([r.error for r in rows], type=pa.string()),
pa.array([r.started_at for r in rows], type=pa.string()),
pa.array([r.finished_at for r in rows], type=pa.string()),
pa.array(
[_parse_iso(r.started_at) for r in rows],
type=pa.timestamp("us", tz="UTC"),
),
pa.array(
[_parse_iso(r.finished_at) for r in rows],
type=pa.timestamp("us", tz="UTC"),
),
pa.array([r.multipart_part_size for r in rows], type=pa.int32()),
pa.array([r.multipart_number_parts for r in rows], type=pa.int32()),
]
return pa.table(arrays, schema=_ARROW_SCHEMA)
def export_parquet(rows: list[DownloadRow], out_dir: str | Path) -> Path:
"""Write *rows* as a single Parquet file inside *out_dir*.
def export_parquet(rows: list[DownloadRow], out_stem: str) -> Path:
"""Write *rows* as a Parquet file named ``{out_stem}.parquet`` in the CWD.
The file is named ``downloads.parquet`` and is overwritten on each run so
that re-runs always reflect the latest state of the SQLite database.
The file is overwritten on each call so that re-runs always reflect the
latest state of the SQLite database.
Returns the path to the written file.
"""
out_dir = Path(out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
dest = out_dir / "downloads.parquet"
dest = Path(f"{out_stem}.parquet")
table = rows_to_table(rows)
pq.write_table(table, dest, compression="zstd")
return dest
@@ -5,6 +5,8 @@ from __future__ import annotations
import asyncio
import random
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import httpx
@@ -21,7 +23,12 @@ class WorkerResult:
key: str | None = None
content_type: str | None = None
size_bytes: int | None = None
etag: str | None = None
error: str | None = None
multipart_part_size: int | None = None
multipart_number_parts: int | None = None
started_at: str | None = None
finished_at: str | None = None
async def call_worker(
@@ -49,6 +56,8 @@ async def call_worker(
"Content-Type": "application/json",
}
started = datetime.now(timezone.utc).isoformat()
try:
resp = await client.post(
worker_url,
@@ -67,6 +76,11 @@ async def call_worker(
key=body.get("key"),
content_type=body.get("content_type"),
size_bytes=body.get("size_bytes"),
etag=body.get("etag"),
multipart_part_size=body.get("multipart_part_size"),
multipart_number_parts=body.get("multipart_number_parts"),
started_at=body.get("started_at"),
finished_at=body.get("finished_at"),
)
else:
return WorkerResult(
@@ -74,13 +88,15 @@ async def call_worker(
ok=False,
http_status=resp.status_code,
error=body.get("error", resp.text),
started_at=body.get("started_at"),
finished_at=body.get("finished_at"),
)
except httpx.TimeoutException as exc:
return WorkerResult(url=download_url, ok=False, error=f"Timeout: {exc}")
return WorkerResult(url=download_url, ok=False, error=f"Timeout: {exc}", started_at=started)
except httpx.HTTPError as exc:
return WorkerResult(url=download_url, ok=False, error=f"HTTP error: {exc}")
return WorkerResult(url=download_url, ok=False, error=f"HTTP error: {exc}", started_at=started)
except Exception as exc: # noqa: BLE001
return WorkerResult(url=download_url, ok=False, error=str(exc))
return WorkerResult(url=download_url, ok=False, error=str(exc), started_at=started)
async def call_worker_with_retries(
@@ -121,3 +137,71 @@ async def call_worker_with_retries(
assert last_result is not None # noqa: S101
return last_result
async def upload_file_to_worker(
client: httpx.AsyncClient,
*,
worker_url: str,
auth_token: str,
file_path: Path,
s3_key: str,
content_type: str = "application/octet-stream",
timeout: float = 120.0,
) -> WorkerResult:
"""Upload a local file directly to S3 via the worker PUT endpoint.
Reads the file and sends it as a PUT request body with the ``X-S3-Key``
header specifying the destination S3 object key.
"""
headers = {
"Authorization": f"Bearer {auth_token}",
"X-S3-Key": s3_key,
"Content-Type": content_type,
}
try:
file_size = file_path.stat().st_size
headers["Content-Length"] = str(file_size)
with open(file_path, "rb") as fh:
file_bytes = fh.read()
resp = await client.put(
worker_url,
content=file_bytes,
headers=headers,
timeout=timeout,
)
body: dict[str, Any] = resp.json()
if resp.is_success and body.get("ok"):
return WorkerResult(
url=str(file_path),
ok=True,
http_status=resp.status_code,
bucket=body.get("bucket"),
key=body.get("key"),
content_type=body.get("content_type"),
size_bytes=body.get("size_bytes"),
etag=body.get("etag"),
multipart_part_size=body.get("multipart_part_size"),
multipart_number_parts=body.get("multipart_number_parts"),
started_at=body.get("started_at"),
finished_at=body.get("finished_at"),
)
else:
return WorkerResult(
url=str(file_path),
ok=False,
http_status=resp.status_code,
error=body.get("error", resp.text),
started_at=body.get("started_at"),
finished_at=body.get("finished_at"),
)
except httpx.TimeoutException as exc:
return WorkerResult(url=str(file_path), ok=False, error=f"Timeout: {exc}")
except httpx.HTTPError as exc:
return WorkerResult(url=str(file_path), ok=False, error=f"HTTP error: {exc}")
except Exception as exc: # noqa: BLE001
return WorkerResult(url=str(file_path), ok=False, error=str(exc))