commit d6d132e97b2291a7c3956a2eeecabfa81207b1da Author: Diego Ripley Date: Thu Feb 19 17:08:15 2026 -0500 Initial commit. Companion to https://www.diegoripley.ca/blog/2025/what-i-learned-from-processing-all-statcan-tables/ diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..c6932aa --- /dev/null +++ b/__init__.py @@ -0,0 +1,403 @@ +""" +TODO: +- Modularize code +- Improve documentation +- Create test cases +""" +from datetime import datetime +import glob +from multiprocessing import Pool +import json +import logging +import os +import sqlite3 +import zipfile +from zoneinfo import ZoneInfo + +import pandas as pd +""" +TODO: Look into whether polars CSV parsing has improved since 2025-06-19. +I encountered a bunch of issues with properly parsing StatCan CSVs +""" +import polars as pl +import requests +from tqdm import tqdm + +data_folder = "/data/tables" +input_folder = f"{data_folder}/input" +scratch_folder = f"{data_folder}/scratch" +output_folder = f"{data_folder}/output" + +""" +TODO: Do logging properly +""" +import logging +logging.basicConfig( + filename="processing.log", + encoding="utf-8", + filemode="a", + format="{asctime} - {levelname} - {message}", + style="{", + datefmt="%Y-%m-%d %H:%M", +) + +""" +TODO: Do this properly. These are the requirements: +- Lightweight way for users to keep track what StatCan tables they have +downloaded, and for them to update the table when new data is available +""" +if not os.path.exists(f"{data_folder}/processing.db"): + con = sqlite3.connect(f"{data_folder}/processing.db", timeout=60.0) + cur = con.cursor() + cur.executescript(""" + CREATE TABLE IF NOT EXISTS downloaded ( + product_id TEXT PRIMARY KEY, + last_updated TEXT, + last_processed TEXT + ); + + CREATE TABLE IF NOT EXISTS cubes ( + product_id TEXT PRIMARY KEY, + last_updated TEXT + ); + """) + con.commit() +else: + con = sqlite3.connect(f"{data_folder}/processing.db", timeout=60.0) + cur = con.cursor() + +def setup(): + """ + Makes data folders + """ + folders_to_create = [data_folder, input_folder, + scratch_folder, output_folder, + f"{input_folder}/en", f"{output_folder}/en", + f"{input_folder}/fr", f"{output_folder}/fr", + f"{input_folder}/metadata"] + for folder in folders_to_create: + if not os.path.exists(folder): + print(f"Making folder {folder}") + os.mkdir(folder) + +def update_last_downloaded(product_id): + """ + Updates SQLite database with the last time the table was updated + The datetime is in Eastern timezone, so have to convert to UTC to + be consistent with https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite + """ + filepath = f"{input_folder}/metadata/{product_id}.json" + print(f"Reading metadata {filepath}") + with open(filepath, 'r') as fp: + metadata = json.load(fp) + product_id = metadata.get("object").get("productId") + last_updated = metadata.get("object").get("releaseTime") + # Convert last_updated to UTC since /getAllcubesListLite uses UTC + last_updated = datetime.strptime(last_updated, "%Y-%m-%dT%H:%M") + last_updated = last_updated.replace(tzinfo=ZoneInfo("America/Toronto")) + last_updated = last_updated.astimezone(ZoneInfo("UTC")).isoformat() + + data = (product_id, last_updated) + cur.execute("SELECT product_id FROM downloaded WHERE product_id = ?", (product_id,)) + result = cur.fetchone() + if not result: + cur.execute("INSERT INTO downloaded (product_id, last_updated) VALUES (?, ?)", data) + else: + cur.execute("UPDATE downloaded SET last_updated = ? WHERE product_id = ?", (last_updated, product_id)) + + con.commit() + +def update_last_processed(product_id): + time_finished_processing = datetime.now().isoformat() + cur.execute("UPDATE downloaded SET last_processed = ? WHERE product_id = ?", (time_finished_processing, product_id)) + con.commit() + +def update_tables(): + """ + This currently does not work as expected because Statistics Canada has discrepancies. + The "releaseTime" listed in https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite + for every pdocutId is not the same as "releaseTime" listed when making a POST + https://www150.statcan.gc.ca/t1/wds/rest/getCubeMetadata , for example: + [{"productId":10100007}] + """ + cur.execute(""" + DELETE FROM cubes; + """) + con.commit() + response = requests.get("https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite").json() + cubes_metadata = pl.from_dicts(response)[['productId', 'releaseTime']] + cubes_metadata = cubes_metadata.rename({"productId": "product_id", "releaseTime": "last_updated"}) + cubes_metadata = cubes_metadata.rows() + cubes_metadata_new = [] + for cube in cubes_metadata: + product_id, last_updated = cube + # Update the date field so it is formatted the same as date field in downloaded table + last_updated = datetime.strptime(last_updated, "%Y-%m-%dT%H:%M:%SZ").astimezone(ZoneInfo("UTC")) + last_updated = last_updated.isoformat() + cubes_metadata_new.append((product_id, last_updated)) + + cur.executemany("INSERT INTO cubes VALUES(?, ?)", cubes_metadata_new) + con.commit() + + cur.execute(""" + SELECT a.product_id + FROM downloaded AS a, + cubes AS b + WHERE a.product_id = b.product_id + AND b.last_updated > a.last_updated + """) + results = cur.fetchall() + for result in results: + product_id = result[0] + print(f"Updating product_id: {product_id}") + download_cube(product_id) + process_cube(product_id) + +def compute_ref_date_bounds(df): + """ + TODO: + - Completely rewrite this, AI helpmed me figure out this logic as I had + no experience with dates + """ + """ + - There are cases where the REF_DATE is a range, ex. 2023/2024. + For productId 17100022 the period is from 2023-07-01 to 2024-06-30 + (seen in the metadata), so we cannot just use 2023-01-01 and 2024-12-31 + """ + series = df["REF_DATE"] + # Initialize the two new columns with NaT + df["REF_START_DATE"] = pd.NaT + df["REF_END_DATE"] = pd.NaT + + # Skip rows that contain slashes + valid_mask = ~series.str.contains("/", na=False) + + # Case 1: YYYY-MM-DD + full_mask = valid_mask & series.str.fullmatch(r"\d{4}-\d{2}-\d{2}") + parsed_full = pd.to_datetime(series[full_mask], format="%Y-%m-%d", errors="coerce") + df.loc[full_mask, "REF_START_DATE"] = parsed_full + df.loc[full_mask, "REF_END_DATE"] = parsed_full + + # Case 2: YYYY-MM + month_mask = valid_mask & series.str.fullmatch(r"\d{4}-\d{2}") + parsed_month = pd.to_datetime(series[month_mask], format="%Y-%m", errors="coerce") + df.loc[month_mask, "REF_START_DATE"] = parsed_month + df.loc[month_mask, "REF_END_DATE"] = parsed_month + pd.to_timedelta( + parsed_month.dt.days_in_month - 1, unit='D' + ) + + # Case 3: YYYY + year_mask = valid_mask & series.str.fullmatch(r"\d{4}") + parsed_year = pd.to_datetime(series[year_mask], format="%Y", errors="coerce") + df.loc[year_mask, "REF_START_DATE"] = parsed_year + df.loc[year_mask, "REF_END_DATE"] = parsed_year + pd.offsets.YearEnd(0) + + # Move columns after REF_DATE + ref_idx = df.columns.get_loc("REF_DATE") + cols = list(df.columns) + cols.remove("REF_START_DATE") + cols.remove("REF_END_DATE") + cols[ref_idx + 1:ref_idx + 1] = ["REF_START_DATE", "REF_END_DATE"] + + return df[cols] + +def convert_to_lowest_type(df): + """ + Convert columns to the best possible dtypes + For example, if the column is numerical and has a maximum value of 32,000 + we can assign it a type of int16 + """ + dtypes = pd.DataFrame(df.dtypes) + # Downcast to the smallest numerical dtype + for row in dtypes.itertuples(): + column = row[0] + the_type = str(row[1]) + if the_type == 'Int64': + df[column] = pd.to_numeric(df[column], downcast='integer') + + return df + +def extract_zipfile(product_id, language): + """ + It is faster to extract the zip file and read the CSV, than open + via zipfile and then Pandas + """ + zip_file = f"{input_folder}/{language}/{product_id}.zip" + with zipfile.ZipFile(zip_file) as myzip: + print(f"Extracting {zip_file} to {scratch_folder}") + myzip.extractall(path=scratch_folder) + +def get_cube_metadata(product_id): + url = f"https://www150.statcan.gc.ca/t1/wds/rest/getCubeMetadata" + cubes_payload = [{"productId": product_id}] + result = requests.post(url, json=cubes_payload) + result = result.json()[0] + return result + +def download_cube(product_id, language="en"): + """ + Downloads the English CSV for a specific table + """ + download_url = f"https://www150.statcan.gc.ca/t1/wds/rest/getFullTableDownloadCSV/{product_id}/en" + response = requests.get(download_url).json() + zip_url = response['object'] + zip_file_name = f"{input_folder}/{language}/{product_id}.zip" + print(f"Downloading {zip_url} to {zip_file_name}") + response = requests.get(zip_url, stream=True, headers={"user-agent": None}) + progress_bar = tqdm( + desc=zip_file_name, + total=int(response.headers.get("content-length", 0)), + unit="B", + unit_scale=True + ) + with open(zip_file_name, "wb") as handle: + for chunk in response.iter_content(chunk_size=512): + if chunk: # filter out keep-alive new chunks + handle.write(chunk) + progress_bar.update(len(chunk)) + progress_bar.close() + +def cleanup_product(product_id): + """ + Remove the scratch files for a given productId + """ + print(f"Removing scratch files for productId {product_id}") + os.remove(f"{scratch_folder}/{product_id}.csv") + os.remove(f"{scratch_folder}/{product_id}_MetaData.csv") + +def process_cube(product_id, language="en"): + """ + Examples: + - productId 43100011 has all with DECIMAL = 1 (float64) + - productId 17100009 has DECIMAL = 0 (int64) + - productId 35100076 has multiple DECIMAL precisions [0, 1, 2] (int64, float64, float64) + - The duplicate column issue just happens with two column names in all data products ("Value", "VALUE", and "Status", "STATUS") + - productId 10100164 has two columns named the same "Value" and "VALUE". DuckDB treats column names in a case insensitve manner, so + "Value" and "VALUE" are the same. So we will need to rename "Value" to "Value.1" + - productId 13100902 has two columns named the same "Status" and "STATUS". We will need to rename "Status" to "STATUS" + - productId 13100442 has 18 fields, but 19 fields were seen in line 162 + - There are cases where the "DECIMALS" column does not exist in the CSV. productId 98100001 is one example. + In this case, we do let the .read_csv method guess the data types + """ + cur.execute("SELECT product_id FROM downloaded WHERE product_id = ?", (product_id,)) + result = cur.fetchone() + if result: + print(f"Already processed {product_id}") + return + extract_zipfile(product_id, language) + """ + The pandas column reader is better than the Polars one + Here is an example where polars was not reading it right: + https://www150.statcan.gc.ca/n1/tbl/csv/98100404-eng.zip + """ + # Get metadata + #metadata_file = f"{input_folder}/metadata/{product_id}.json" + #metadata = get_cube_metadata(product_id) + #print(f"Writing metadata file {metadata_file}") + #with open(metadata_file, "w") as outfile: + # json.dump(metadata, outfile) + # Read CSV using Pandas + product_csv = f"{scratch_folder}/{product_id}.csv" + print(f"Reading {product_csv}") + parameters = { + "filepath_or_buffer": product_csv, + "engine": "c", + #"nrows": 100000, + "dtype": {} + } + columns = pd.read_csv(product_csv, nrows=0).columns + columns_to_rename = ['Value', 'Status'] + for column in columns_to_rename: + if column in columns: + print(f"Renaming '{column}' to '{column}.1'") + columns = [f'{column}.1' if x == column else x for x in columns] + # Explicitly tell pandas to not read column names from CSV + parameters["header"] = 0 + parameters["names"] = columns + + columns_always_int_8 = ["DECIMALS", "SCALAR_ID"] + for column in columns_always_int_8: + if column in columns: + parameters["dtype"][column] = 'int8' + + columns_always_int_16 = ["UOM_ID"] + for column in columns_always_int_16: + if column in columns: + parameters["dtype"][column] = 'int16' + # REF_DATE, GEO, DGUID should always be string + columns_always_string = ["REF_DATE", "GEO", "DGUID"] + for column in columns_always_string: + if column in columns: + parameters["dtype"][column] = 'string' + + # The remaining columns should be string, with the exception of VALUE + # Added "DECIMAL" check as there can be numeric columns that are not the VALUE column + if "DECIMALS" in columns: + for column in columns: + if column not in columns_always_int_8 and column not in columns_always_int_16 and column != "VALUE": + parameters["dtype"][column] = 'string' + + if not parameters["dtype"]: + del parameters["dtype"] + + print(f"Reading {product_csv} as a Pandas dataframe") + try: + df = pd.read_csv(**parameters) + except Exception: + logging.error(f"Failed to process productId: {product_id}") + cleanup_product(product_id) + return + + if "DECIMALS" in columns: + unique_decimal_values = df["DECIMALS"].unique() + #print(unique_decimal_values) + if any(unique_decimal_values): + """ + A table can have both float and integer in the VALUE field. + productId 11100025 is an example + So if we have unique values for DECIMALS to be [0,1], then we convert to float64 + """ + convert_dict = {"VALUE": "float64"} + #print(convert_dict) + df = df.astype(convert_dict) + elif 0 in (unique_decimal_values): + if df["VALUE"].dtype != "Int64": + # If DECIMALS = [0] + convert_dict = {"VALUE": "Int64"} + #print(convert_dict) + df = df.astype(convert_dict) + else: + parameters = { + "convert_string": True, + "convert_boolean": False + } + #print("DECIMALS not in columns, using .convert_dtypes") + df = df.convert_dtypes(**parameters) + + df = convert_to_lowest_type(df) + df = compute_ref_date_bounds(df) + output_parquet = f"{output_folder}/{language}/{product_id}.parquet" + print(f"Exporting dataframe as parquet to {output_parquet}") + parameters = { + "path": output_parquet, + "engine": "pyarrow", + "compression": "zstd", + "index": False, + "compression_level": 22 + } + df.to_parquet(**parameters) + # Remove scratch files + cleanup_product(product_id) + update_last_downloaded(product_id) + update_last_processed(product_id) + + +if __name__ == '__main__': + setup() + files_to_process = glob.glob(f"{input_folder}/en/*.zip") + # Get the product_id + files_to_process = [x.split("/")[-1].split(".zip")[0] for x in files_to_process] + to_process = len(files_to_process) + print(f"Processing {to_process}") + with Pool(processes=2) as p: + p.map(process_cube, files_to_process, chunksize=1) \ No newline at end of file