Diego Ripley
2026-02-19 17:08:15 -05:00
commit d6d132e97b
+403
View File
@@ -0,0 +1,403 @@
"""
TODO:
- Modularize code
- Improve documentation
- Create test cases
"""
from datetime import datetime
import glob
from multiprocessing import Pool
import json
import logging
import os
import sqlite3
import zipfile
from zoneinfo import ZoneInfo
import pandas as pd
"""
TODO: Look into whether polars CSV parsing has improved since 2025-06-19.
I encountered a bunch of issues with properly parsing StatCan CSVs
"""
import polars as pl
import requests
from tqdm import tqdm
data_folder = "/data/tables"
input_folder = f"{data_folder}/input"
scratch_folder = f"{data_folder}/scratch"
output_folder = f"{data_folder}/output"
"""
TODO: Do logging properly
"""
import logging
logging.basicConfig(
filename="processing.log",
encoding="utf-8",
filemode="a",
format="{asctime} - {levelname} - {message}",
style="{",
datefmt="%Y-%m-%d %H:%M",
)
"""
TODO: Do this properly. These are the requirements:
- Lightweight way for users to keep track what StatCan tables they have
downloaded, and for them to update the table when new data is available
"""
if not os.path.exists(f"{data_folder}/processing.db"):
con = sqlite3.connect(f"{data_folder}/processing.db", timeout=60.0)
cur = con.cursor()
cur.executescript("""
CREATE TABLE IF NOT EXISTS downloaded (
product_id TEXT PRIMARY KEY,
last_updated TEXT,
last_processed TEXT
);
CREATE TABLE IF NOT EXISTS cubes (
product_id TEXT PRIMARY KEY,
last_updated TEXT
);
""")
con.commit()
else:
con = sqlite3.connect(f"{data_folder}/processing.db", timeout=60.0)
cur = con.cursor()
def setup():
"""
Makes data folders
"""
folders_to_create = [data_folder, input_folder,
scratch_folder, output_folder,
f"{input_folder}/en", f"{output_folder}/en",
f"{input_folder}/fr", f"{output_folder}/fr",
f"{input_folder}/metadata"]
for folder in folders_to_create:
if not os.path.exists(folder):
print(f"Making folder {folder}")
os.mkdir(folder)
def update_last_downloaded(product_id):
"""
Updates SQLite database with the last time the table was updated
The datetime is in Eastern timezone, so have to convert to UTC to
be consistent with https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite
"""
filepath = f"{input_folder}/metadata/{product_id}.json"
print(f"Reading metadata {filepath}")
with open(filepath, 'r') as fp:
metadata = json.load(fp)
product_id = metadata.get("object").get("productId")
last_updated = metadata.get("object").get("releaseTime")
# Convert last_updated to UTC since /getAllcubesListLite uses UTC
last_updated = datetime.strptime(last_updated, "%Y-%m-%dT%H:%M")
last_updated = last_updated.replace(tzinfo=ZoneInfo("America/Toronto"))
last_updated = last_updated.astimezone(ZoneInfo("UTC")).isoformat()
data = (product_id, last_updated)
cur.execute("SELECT product_id FROM downloaded WHERE product_id = ?", (product_id,))
result = cur.fetchone()
if not result:
cur.execute("INSERT INTO downloaded (product_id, last_updated) VALUES (?, ?)", data)
else:
cur.execute("UPDATE downloaded SET last_updated = ? WHERE product_id = ?", (last_updated, product_id))
con.commit()
def update_last_processed(product_id):
time_finished_processing = datetime.now().isoformat()
cur.execute("UPDATE downloaded SET last_processed = ? WHERE product_id = ?", (time_finished_processing, product_id))
con.commit()
def update_tables():
"""
This currently does not work as expected because Statistics Canada has discrepancies.
The "releaseTime" listed in https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite
for every pdocutId is not the same as "releaseTime" listed when making a POST
https://www150.statcan.gc.ca/t1/wds/rest/getCubeMetadata , for example:
[{"productId":10100007}]
"""
cur.execute("""
DELETE FROM cubes;
""")
con.commit()
response = requests.get("https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite").json()
cubes_metadata = pl.from_dicts(response)[['productId', 'releaseTime']]
cubes_metadata = cubes_metadata.rename({"productId": "product_id", "releaseTime": "last_updated"})
cubes_metadata = cubes_metadata.rows()
cubes_metadata_new = []
for cube in cubes_metadata:
product_id, last_updated = cube
# Update the date field so it is formatted the same as date field in downloaded table
last_updated = datetime.strptime(last_updated, "%Y-%m-%dT%H:%M:%SZ").astimezone(ZoneInfo("UTC"))
last_updated = last_updated.isoformat()
cubes_metadata_new.append((product_id, last_updated))
cur.executemany("INSERT INTO cubes VALUES(?, ?)", cubes_metadata_new)
con.commit()
cur.execute("""
SELECT a.product_id
FROM downloaded AS a,
cubes AS b
WHERE a.product_id = b.product_id
AND b.last_updated > a.last_updated
""")
results = cur.fetchall()
for result in results:
product_id = result[0]
print(f"Updating product_id: {product_id}")
download_cube(product_id)
process_cube(product_id)
def compute_ref_date_bounds(df):
"""
TODO:
- Completely rewrite this, AI helpmed me figure out this logic as I had
no experience with dates
"""
"""
- There are cases where the REF_DATE is a range, ex. 2023/2024.
For productId 17100022 the period is from 2023-07-01 to 2024-06-30
(seen in the metadata), so we cannot just use 2023-01-01 and 2024-12-31
"""
series = df["REF_DATE"]
# Initialize the two new columns with NaT
df["REF_START_DATE"] = pd.NaT
df["REF_END_DATE"] = pd.NaT
# Skip rows that contain slashes
valid_mask = ~series.str.contains("/", na=False)
# Case 1: YYYY-MM-DD
full_mask = valid_mask & series.str.fullmatch(r"\d{4}-\d{2}-\d{2}")
parsed_full = pd.to_datetime(series[full_mask], format="%Y-%m-%d", errors="coerce")
df.loc[full_mask, "REF_START_DATE"] = parsed_full
df.loc[full_mask, "REF_END_DATE"] = parsed_full
# Case 2: YYYY-MM
month_mask = valid_mask & series.str.fullmatch(r"\d{4}-\d{2}")
parsed_month = pd.to_datetime(series[month_mask], format="%Y-%m", errors="coerce")
df.loc[month_mask, "REF_START_DATE"] = parsed_month
df.loc[month_mask, "REF_END_DATE"] = parsed_month + pd.to_timedelta(
parsed_month.dt.days_in_month - 1, unit='D'
)
# Case 3: YYYY
year_mask = valid_mask & series.str.fullmatch(r"\d{4}")
parsed_year = pd.to_datetime(series[year_mask], format="%Y", errors="coerce")
df.loc[year_mask, "REF_START_DATE"] = parsed_year
df.loc[year_mask, "REF_END_DATE"] = parsed_year + pd.offsets.YearEnd(0)
# Move columns after REF_DATE
ref_idx = df.columns.get_loc("REF_DATE")
cols = list(df.columns)
cols.remove("REF_START_DATE")
cols.remove("REF_END_DATE")
cols[ref_idx + 1:ref_idx + 1] = ["REF_START_DATE", "REF_END_DATE"]
return df[cols]
def convert_to_lowest_type(df):
"""
Convert columns to the best possible dtypes
For example, if the column is numerical and has a maximum value of 32,000
we can assign it a type of int16
"""
dtypes = pd.DataFrame(df.dtypes)
# Downcast to the smallest numerical dtype
for row in dtypes.itertuples():
column = row[0]
the_type = str(row[1])
if the_type == 'Int64':
df[column] = pd.to_numeric(df[column], downcast='integer')
return df
def extract_zipfile(product_id, language):
"""
It is faster to extract the zip file and read the CSV, than open
via zipfile and then Pandas
"""
zip_file = f"{input_folder}/{language}/{product_id}.zip"
with zipfile.ZipFile(zip_file) as myzip:
print(f"Extracting {zip_file} to {scratch_folder}")
myzip.extractall(path=scratch_folder)
def get_cube_metadata(product_id):
url = f"https://www150.statcan.gc.ca/t1/wds/rest/getCubeMetadata"
cubes_payload = [{"productId": product_id}]
result = requests.post(url, json=cubes_payload)
result = result.json()[0]
return result
def download_cube(product_id, language="en"):
"""
Downloads the English CSV for a specific table
"""
download_url = f"https://www150.statcan.gc.ca/t1/wds/rest/getFullTableDownloadCSV/{product_id}/en"
response = requests.get(download_url).json()
zip_url = response['object']
zip_file_name = f"{input_folder}/{language}/{product_id}.zip"
print(f"Downloading {zip_url} to {zip_file_name}")
response = requests.get(zip_url, stream=True, headers={"user-agent": None})
progress_bar = tqdm(
desc=zip_file_name,
total=int(response.headers.get("content-length", 0)),
unit="B",
unit_scale=True
)
with open(zip_file_name, "wb") as handle:
for chunk in response.iter_content(chunk_size=512):
if chunk: # filter out keep-alive new chunks
handle.write(chunk)
progress_bar.update(len(chunk))
progress_bar.close()
def cleanup_product(product_id):
"""
Remove the scratch files for a given productId
"""
print(f"Removing scratch files for productId {product_id}")
os.remove(f"{scratch_folder}/{product_id}.csv")
os.remove(f"{scratch_folder}/{product_id}_MetaData.csv")
def process_cube(product_id, language="en"):
"""
Examples:
- productId 43100011 has all with DECIMAL = 1 (float64)
- productId 17100009 has DECIMAL = 0 (int64)
- productId 35100076 has multiple DECIMAL precisions [0, 1, 2] (int64, float64, float64)
- The duplicate column issue just happens with two column names in all data products ("Value", "VALUE", and "Status", "STATUS")
- productId 10100164 has two columns named the same "Value" and "VALUE". DuckDB treats column names in a case insensitve manner, so
"Value" and "VALUE" are the same. So we will need to rename "Value" to "Value.1"
- productId 13100902 has two columns named the same "Status" and "STATUS". We will need to rename "Status" to "STATUS"
- productId 13100442 has 18 fields, but 19 fields were seen in line 162
- There are cases where the "DECIMALS" column does not exist in the CSV. productId 98100001 is one example.
In this case, we do let the .read_csv method guess the data types
"""
cur.execute("SELECT product_id FROM downloaded WHERE product_id = ?", (product_id,))
result = cur.fetchone()
if result:
print(f"Already processed {product_id}")
return
extract_zipfile(product_id, language)
"""
The pandas column reader is better than the Polars one
Here is an example where polars was not reading it right:
https://www150.statcan.gc.ca/n1/tbl/csv/98100404-eng.zip
"""
# Get metadata
#metadata_file = f"{input_folder}/metadata/{product_id}.json"
#metadata = get_cube_metadata(product_id)
#print(f"Writing metadata file {metadata_file}")
#with open(metadata_file, "w") as outfile:
# json.dump(metadata, outfile)
# Read CSV using Pandas
product_csv = f"{scratch_folder}/{product_id}.csv"
print(f"Reading {product_csv}")
parameters = {
"filepath_or_buffer": product_csv,
"engine": "c",
#"nrows": 100000,
"dtype": {}
}
columns = pd.read_csv(product_csv, nrows=0).columns
columns_to_rename = ['Value', 'Status']
for column in columns_to_rename:
if column in columns:
print(f"Renaming '{column}' to '{column}.1'")
columns = [f'{column}.1' if x == column else x for x in columns]
# Explicitly tell pandas to not read column names from CSV
parameters["header"] = 0
parameters["names"] = columns
columns_always_int_8 = ["DECIMALS", "SCALAR_ID"]
for column in columns_always_int_8:
if column in columns:
parameters["dtype"][column] = 'int8'
columns_always_int_16 = ["UOM_ID"]
for column in columns_always_int_16:
if column in columns:
parameters["dtype"][column] = 'int16'
# REF_DATE, GEO, DGUID should always be string
columns_always_string = ["REF_DATE", "GEO", "DGUID"]
for column in columns_always_string:
if column in columns:
parameters["dtype"][column] = 'string'
# The remaining columns should be string, with the exception of VALUE
# Added "DECIMAL" check as there can be numeric columns that are not the VALUE column
if "DECIMALS" in columns:
for column in columns:
if column not in columns_always_int_8 and column not in columns_always_int_16 and column != "VALUE":
parameters["dtype"][column] = 'string'
if not parameters["dtype"]:
del parameters["dtype"]
print(f"Reading {product_csv} as a Pandas dataframe")
try:
df = pd.read_csv(**parameters)
except Exception:
logging.error(f"Failed to process productId: {product_id}")
cleanup_product(product_id)
return
if "DECIMALS" in columns:
unique_decimal_values = df["DECIMALS"].unique()
#print(unique_decimal_values)
if any(unique_decimal_values):
"""
A table can have both float and integer in the VALUE field.
productId 11100025 is an example
So if we have unique values for DECIMALS to be [0,1], then we convert to float64
"""
convert_dict = {"VALUE": "float64"}
#print(convert_dict)
df = df.astype(convert_dict)
elif 0 in (unique_decimal_values):
if df["VALUE"].dtype != "Int64":
# If DECIMALS = [0]
convert_dict = {"VALUE": "Int64"}
#print(convert_dict)
df = df.astype(convert_dict)
else:
parameters = {
"convert_string": True,
"convert_boolean": False
}
#print("DECIMALS not in columns, using .convert_dtypes")
df = df.convert_dtypes(**parameters)
df = convert_to_lowest_type(df)
df = compute_ref_date_bounds(df)
output_parquet = f"{output_folder}/{language}/{product_id}.parquet"
print(f"Exporting dataframe as parquet to {output_parquet}")
parameters = {
"path": output_parquet,
"engine": "pyarrow",
"compression": "zstd",
"index": False,
"compression_level": 22
}
df.to_parquet(**parameters)
# Remove scratch files
cleanup_product(product_id)
update_last_downloaded(product_id)
update_last_processed(product_id)
if __name__ == '__main__':
setup()
files_to_process = glob.glob(f"{input_folder}/en/*.zip")
# Get the product_id
files_to_process = [x.split("/")[-1].split(".zip")[0] for x in files_to_process]
to_process = len(files_to_process)
print(f"Processing {to_process}")
with Pool(processes=2) as p:
p.map(process_cube, files_to_process, chunksize=1)