Made improvements

This commit is contained in:
Diego Ripley
2026-03-14 07:34:18 -04:00
parent 9c35374b04
commit c3f0fb641b
4 changed files with 31840 additions and 31751 deletions
+2 -1
View File
@@ -1,3 +1,4 @@
*.pmtiles *.pmtiles
.venv/ .venv/
.env .env
*.sqlite
@@ -18,10 +18,10 @@ S3_BUCKET = "us-west-2.opendata.source.coop"
S3_REGION = "us-west-2" S3_REGION = "us-west-2"
S3_ENDPOINT = "" S3_ENDPOINT = ""
[placement]
# TODO: parametize so user can specifically set where they want the worker to run # TODO: parametize so user can specifically set where they want the worker to run
mode = "smart" [placement]
#hostname = "diffusion.mern.gouv.qc.ca" hostname = "diffusion.mern.gouv.qc.ca"
#mode = "smart"
[observability] [observability]
enabled = true enabled = true
@@ -3,6 +3,7 @@
from __future__ import annotations from __future__ import annotations
import sqlite3 import sqlite3
import sys
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import Sequence from typing import Sequence
@@ -48,6 +49,77 @@ CREATE INDEX IF NOT EXISTS ix_downloads_dataset ON downloads(dataset_id);
CREATE INDEX IF NOT EXISTS ix_downloads_status ON downloads(status); CREATE INDEX IF NOT EXISTS ix_downloads_status ON downloads(status);
""" """
# Columns in the canonical schema (order matters for the migration INSERT).
_CANONICAL_COLUMNS = [
"url", "dataset_id", "status", "http_status", "etag", "error",
"started_at", "finished_at", "multipart_part_size", "multipart_number_parts",
]
# ---------------------------------------------------------------------------
# Migration helpers
# ---------------------------------------------------------------------------
def _table_has_primary_key(conn: sqlite3.Connection, table: str) -> bool:
"""Return ``True`` if *table* has at least one column marked as PK."""
cur = conn.execute(f"PRAGMA table_info({table})")
for row in cur.fetchall():
if row["pk"]: # pk column is non-zero for PK members
return True
return False
def _existing_columns(conn: sqlite3.Connection, table: str) -> list[str]:
"""Return the column names of *table* in definition order."""
cur = conn.execute(f"PRAGMA table_info({table})")
return [row["name"] for row in cur.fetchall()]
def _migrate_downloads(conn: sqlite3.Connection) -> None:
"""Recreate the ``downloads`` table with the correct PRIMARY KEY.
This handles databases that were originally created by an external tool
(e.g. pandas / DuckDB) where ``url`` was defined as plain ``VARCHAR``
without a PRIMARY KEY constraint. Existing data is preserved; duplicate
``url`` values (if any) are collapsed by keeping the most recent row
(highest ``rowid``).
"""
old_cols = _existing_columns(conn, "downloads")
# Only copy columns that exist in both old and new schemas.
shared = [c for c in _CANONICAL_COLUMNS if c in old_cols]
cols_csv = ", ".join(shared)
conn.executescript(f"""\
BEGIN;
ALTER TABLE downloads RENAME TO _downloads_old;
CREATE TABLE downloads (
url TEXT PRIMARY KEY,
dataset_id TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT '',
http_status INTEGER,
etag TEXT,
error TEXT,
started_at TEXT NOT NULL DEFAULT '',
finished_at TEXT,
multipart_part_size INTEGER,
multipart_number_parts INTEGER
);
INSERT OR IGNORE INTO downloads ({cols_csv})
SELECT {cols_csv}
FROM _downloads_old
ORDER BY rowid DESC;
DROP TABLE _downloads_old;
CREATE INDEX IF NOT EXISTS ix_downloads_dataset ON downloads(dataset_id);
CREATE INDEX IF NOT EXISTS ix_downloads_status ON downloads(status);
COMMIT;
""")
print(
f"[migrate] Recreated 'downloads' table with PRIMARY KEY(url); "
f"copied columns: {cols_csv}",
file=sys.stderr,
)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Database handle # Database handle
@@ -62,6 +134,22 @@ class DownloadsDB:
self._conn = sqlite3.connect(str(self.path)) self._conn = sqlite3.connect(str(self.path))
self._conn.row_factory = sqlite3.Row self._conn.row_factory = sqlite3.Row
self._conn.execute("PRAGMA journal_mode=WAL") self._conn.execute("PRAGMA journal_mode=WAL")
# --- Schema migration ---------------------------------------------------
# If the table already exists but was created without a PRIMARY KEY
# (e.g. by pandas/DuckDB), recreate it with the correct schema.
cur = self._conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='downloads'"
)
if cur.fetchone() is not None:
if not _table_has_primary_key(self._conn, "downloads"):
print(
"[migrate] Detected 'downloads' table without PRIMARY KEY — migrating…",
file=sys.stderr,
)
_migrate_downloads(self._conn)
# ------------------------------------------------------------------------
self._conn.executescript(_SCHEMA_SQL) self._conn.executescript(_SCHEMA_SQL)
# -- queries ------------------------------------------------------------- # -- queries -------------------------------------------------------------