Experiment with Jupyter notebook on downloading and processing statcan cubes

This commit is contained in:
Diego Ripley
2025-06-18 21:26:51 +00:00
parent c0899080f4
commit faa63451ab
@@ -0,0 +1,507 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "8b767621-b96b-4eaa-a16a-0329bab29c0f",
"metadata": {},
"source": [
"# Notes\n",
"- Make sure Statistics Canada knows about these issues:\n",
" - When downloading the XML for product_id 98100404 it just returns the structure document, not the data document\n",
" - The releaseTime value is in Eastern Standard Zone for https://www150.statcan.gc.ca/t1/wds/rest/getCubeMetadata and UTC for https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite\n",
" - The releaseTime value is different when getting it from https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite and https://www150.statcan.gc.ca/t1/wds/rest/getCubeMetadata . For example productId 10100007"
]
},
{
"cell_type": "code",
"execution_count": 188,
"id": "98859cd6-6fa4-4aef-a113-455699524fae",
"metadata": {},
"outputs": [],
"source": [
"from datetime import datetime\n",
"import glob\n",
"from multiprocessing import Pool\n",
"import json\n",
"import os\n",
"import sqlite3\n",
"import zipfile\n",
"from zoneinfo import ZoneInfo\n",
"\n",
"import pandas as pd\n",
"import polars as pl\n",
"import requests\n",
"from tqdm import tqdm\n",
"\n",
"data_folder = \"/data/tables\"\n",
"input_folder = f\"{data_folder}/input\"\n",
"scratch_folder = f\"{data_folder}/scratch\"\n",
"output_folder = f\"{data_folder}/output\"\n",
"\n",
"\n",
"if not os.path.exists(\"processing.db\"):\n",
" con = sqlite3.connect('processing.db')\n",
" cur = con.cursor()\n",
" cur.executescript(\"\"\"\n",
" CREATE TABLE IF NOT EXISTS downloaded (\n",
" product_id TEXT PRIMARY KEY,\n",
" last_updated TEXT\n",
" );\n",
"\n",
" CREATE TABLE IF NOT EXISTS cubes (\n",
" product_id TEXT PRIMARY KEY,\n",
" last_updated TEXT\n",
" );\n",
" \"\"\")\n",
" con.commit()\n",
"else:\n",
" con = sqlite3.connect('processing.db')\n",
" cur = con.cursor()"
]
},
{
"cell_type": "code",
"execution_count": 96,
"id": "28ac4c01-c1c5-427f-bb2c-0da99a4c5591",
"metadata": {},
"outputs": [],
"source": [
"def setup():\n",
" \"\"\"\n",
" Makes data folders\n",
" \"\"\"\n",
" folders_to_create = [data_folder, input_folder, \n",
" scratch_folder, output_folder,\n",
" f\"{input_folder}/en\", f\"{output_folder}/en\",\n",
" f\"{input_folder}/fr\", f\"{output_folder}/fr\",\n",
" f\"{input_folder}/metadata\"]\n",
" for folder in folders_to_create:\n",
" if not os.path.exists(folder):\n",
" print(f\"Making folder {folder}\")\n",
" os.mkdir(folder)"
]
},
{
"cell_type": "code",
"execution_count": 97,
"id": "9daa94f4-16c9-4d8b-951e-3d5d38eb618f",
"metadata": {},
"outputs": [],
"source": [
"setup()"
]
},
{
"cell_type": "code",
"execution_count": 177,
"id": "0af9a4b3-7b59-460b-b933-504919d4bd2a",
"metadata": {},
"outputs": [],
"source": [
"def update_last_downloaded(product_id):\n",
" filepath = f\"{input_folder}/metadata/{product_id}.json\"\n",
" print(f\"Reading metadata {filepath}\")\n",
" with open(filepath, 'r') as fp:\n",
" metadata = json.load(fp)\n",
" product_id = metadata.get(\"object\").get(\"productId\")\n",
" last_updated = metadata.get(\"object\").get(\"releaseTime\")\n",
" # Convert last_updated to UTC since /getAllcubesListLite uses UTC\n",
" last_updated = datetime.strptime(last_updated, \"%Y-%m-%dT%H:%M\")\n",
" last_updated = last_updated.replace(tzinfo=ZoneInfo(\"America/Toronto\"))\n",
" last_updated = last_updated.astimezone(ZoneInfo(\"UTC\")).isoformat()\n",
" \n",
" data = (product_id, last_updated)\n",
" cur.execute(\"SELECT product_id FROM downloaded WHERE product_id = ?\", (product_id,))\n",
" result = cur.fetchone()\n",
" if not result:\n",
" cur.execute(\"INSERT INTO downloaded VALUES (?, ?)\", data)\n",
" else:\n",
" cur.execute(\"UPDATE downloaded SET last_updated = ? WHERE product_id = ?\", (last_updated, product_id))\n",
"\n",
" con.commit()"
]
},
{
"cell_type": "code",
"execution_count": 170,
"id": "23cbabc3-0d4b-4e28-a4df-b2c5e8c7ea8b",
"metadata": {},
"outputs": [],
"source": [
"def update_tables():\n",
" \"\"\"\n",
" This currently does not work as expected because Statistics Canada has discrepancies.\n",
" The \"releaseTime\" listed in https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite\n",
" for every pdocutId is not the same as \"releaseTime\" listed when making a POST \n",
" https://www150.statcan.gc.ca/t1/wds/rest/getCubeMetadata , for example:\n",
" [{\"productId\":10100007}]\n",
" \"\"\"\n",
" cur.execute(\"\"\"\n",
" DELETE FROM cubes;\n",
" \"\"\")\n",
" con.commit()\n",
" response = requests.get(\"https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite\").json()\n",
" cubes_metadata = pl.from_dicts(response)[['productId', 'releaseTime']]\n",
" cubes_metadata = cubes_metadata.rename({\"productId\": \"product_id\", \"releaseTime\": \"last_updated\"})\n",
" cubes_metadata = cubes_metadata.rows()\n",
" cubes_metadata_new = []\n",
" for cube in cubes_metadata:\n",
" product_id, last_updated = cube\n",
" # Update the date field so it is formatted the same as date field in downloaded table\n",
" last_updated = datetime.strptime(last_updated, \"%Y-%m-%dT%H:%M:%SZ\").astimezone(ZoneInfo(\"UTC\"))\n",
" last_updated = last_updated.isoformat()\n",
" cubes_metadata_new.append((product_id, last_updated))\n",
" \n",
" cur.executemany(\"INSERT INTO cubes VALUES(?, ?)\", cubes_metadata_new)\n",
" con.commit()\n",
"\n",
" cur.execute(\"\"\"\n",
" SELECT a.product_id\n",
" FROM downloaded AS a,\n",
" cubes AS b\n",
" WHERE a.product_id = b.product_id\n",
" AND b.last_updated > a.last_updated\n",
" \"\"\")\n",
" results = cur.fetchall()\n",
" for result in results:\n",
" product_id = result[0]\n",
" print(f\"Updating product_id: {product_id}\")\n",
" process_cube(product_id)"
]
},
{
"cell_type": "code",
"execution_count": 186,
"id": "c0d46f38-242c-4685-b8b6-4e046c23aec5",
"metadata": {},
"outputs": [],
"source": [
"cur.execute(\"\"\"\n",
" SELECT a.product_id, a.last_updated, b.last_updated\n",
" FROM downloaded AS a,\n",
" cubes AS b\n",
" WHERE a.product_id = b.product_id\n",
" AND b.last_updated > a.last_updated\n",
"\"\"\")\n",
"difference = pd.DataFrame(cur.fetchall(), columns=[\"product_id\", \"release_time_metadata\", \"release_time_cubelist\"])"
]
},
{
"cell_type": "code",
"execution_count": 189,
"id": "a5e12a7a-9891-4d58-9368-e31b274feb1d",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>product_id</th>\n",
" <th>release_time_metadata</th>\n",
" <th>release_time_cubelist</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>10100006</td>\n",
" <td>2025-05-21T12:30:00+00:00</td>\n",
" <td>2025-06-17T12:30:00+00:00</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>10100108</td>\n",
" <td>2025-05-13T12:30:00+00:00</td>\n",
" <td>2025-06-12T12:30:00+00:00</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>10100132</td>\n",
" <td>2025-06-06T12:30:00+00:00</td>\n",
" <td>2025-06-13T12:30:00+00:00</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>10100136</td>\n",
" <td>2025-06-10T12:30:00+00:00</td>\n",
" <td>2025-06-16T12:30:00+00:00</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>10100138</td>\n",
" <td>2025-06-06T12:30:00+00:00</td>\n",
" <td>2025-06-13T12:30:00+00:00</td>\n",
" </tr>\n",
" <tr>\n",
" <th>...</th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>92</th>\n",
" <td>38100235</td>\n",
" <td>2025-03-13T12:30:00+00:00</td>\n",
" <td>2025-06-12T12:30:00+00:00</td>\n",
" </tr>\n",
" <tr>\n",
" <th>93</th>\n",
" <td>38100236</td>\n",
" <td>2025-03-13T12:30:00+00:00</td>\n",
" <td>2025-06-12T12:30:00+00:00</td>\n",
" </tr>\n",
" <tr>\n",
" <th>94</th>\n",
" <td>38100237</td>\n",
" <td>2025-03-13T12:30:00+00:00</td>\n",
" <td>2025-06-12T12:30:00+00:00</td>\n",
" </tr>\n",
" <tr>\n",
" <th>95</th>\n",
" <td>38100238</td>\n",
" <td>2025-03-13T12:30:00+00:00</td>\n",
" <td>2025-06-12T12:30:00+00:00</td>\n",
" </tr>\n",
" <tr>\n",
" <th>96</th>\n",
" <td>38100164</td>\n",
" <td>2023-09-13T12:30:00+00:00</td>\n",
" <td>2025-06-17T12:30:00+00:00</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"<p>97 rows × 3 columns</p>\n",
"</div>"
],
"text/plain": [
" product_id release_time_metadata release_time_cubelist\n",
"0 10100006 2025-05-21T12:30:00+00:00 2025-06-17T12:30:00+00:00\n",
"1 10100108 2025-05-13T12:30:00+00:00 2025-06-12T12:30:00+00:00\n",
"2 10100132 2025-06-06T12:30:00+00:00 2025-06-13T12:30:00+00:00\n",
"3 10100136 2025-06-10T12:30:00+00:00 2025-06-16T12:30:00+00:00\n",
"4 10100138 2025-06-06T12:30:00+00:00 2025-06-13T12:30:00+00:00\n",
".. ... ... ...\n",
"92 38100235 2025-03-13T12:30:00+00:00 2025-06-12T12:30:00+00:00\n",
"93 38100236 2025-03-13T12:30:00+00:00 2025-06-12T12:30:00+00:00\n",
"94 38100237 2025-03-13T12:30:00+00:00 2025-06-12T12:30:00+00:00\n",
"95 38100238 2025-03-13T12:30:00+00:00 2025-06-12T12:30:00+00:00\n",
"96 38100164 2023-09-13T12:30:00+00:00 2025-06-17T12:30:00+00:00\n",
"\n",
"[97 rows x 3 columns]"
]
},
"execution_count": 189,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"difference"
]
},
{
"cell_type": "code",
"execution_count": 193,
"id": "144e3716-b0e7-4a39-9a25-ededea506f4f",
"metadata": {},
"outputs": [],
"source": [
"def convert_to_lowest_type(df):\n",
" \"\"\"\n",
" Convert columns to the best possible dtypes\n",
" For example, if the column is numerical and has a maximum value of 32,000 \n",
" we can assign it a type of int16\n",
" \"\"\"\n",
" print(\"Converting dataframe to optimal data types\")\n",
" params = {\n",
" 'convert_string': False,\n",
" 'convert_boolean': False\n",
" }\n",
" df = df.convert_dtypes(**params)\n",
"\n",
" dtypes = pd.DataFrame(df.dtypes)\n",
" # Downcast to the smallest numerical dtype\n",
" for row in dtypes.itertuples():\n",
" column = row[0]\n",
" the_type = str(row[1])\n",
" # Skipping downcasting Float64 as there were issues with decimal places\n",
" # For example, instead of a value being 65.4, it turned into 65.4000015258789\n",
" if the_type == 'Float64':\n",
" continue\n",
" elif the_type == 'Int64':\n",
" df[column] = pd.to_numeric(df[column], downcast='integer')\n",
"\n",
" return df\n",
"\n",
"def extract_zipfile(product_id, language):\n",
" \"\"\"\n",
" It is faster to extract the zip file and read the CSV, than open\n",
" via zipfile and then Pandas\n",
" \"\"\"\n",
" zip_file = f\"{input_folder}/{language}/{product_id}.zip\"\n",
" with zipfile.ZipFile(zip_file) as myzip:\n",
" print(f\"Extracting {zip_file} to {scratch_folder}\")\n",
" myzip.extractall(path=scratch_folder)\n",
"\n",
"def get_cube_metadata(product_id):\n",
" url = f\"https://www150.statcan.gc.ca/t1/wds/rest/getCubeMetadata\"\n",
" cubes_payload = [{\"productId\": product_id}]\n",
" result = requests.post(url, json=cubes_payload)\n",
" result = result.json()[0]\n",
" return result\n",
"\n",
"def download_cube(product_id, language=\"en\"):\n",
" download_url = f\"https://www150.statcan.gc.ca/t1/wds/rest/getFullTableDownloadCSV/{product_id}/en\"\n",
" response = requests.get(download_url).json()\n",
" zip_url = response['object']\n",
" zip_file_name = f\"{input_folder}/{language}/{product_id}.zip\"\n",
" print(f\"Downloading {zip_url} to {zip_file_name}\")\n",
" response = requests.get(zip_url, stream=True, headers={\"user-agent\": None})\n",
" progress_bar = tqdm(\n",
" desc=zip_file_name,\n",
" total=int(response.headers.get(\"content-length\", 0)),\n",
" unit=\"B\",\n",
" unit_scale=True\n",
" )\n",
" with open(zip_file_name, \"wb\") as handle:\n",
" for chunk in response.iter_content(chunk_size=512):\n",
" if chunk: # filter out keep-alive new chunks\n",
" handle.write(chunk)\n",
" progress_bar.update(len(chunk))\n",
" progress_bar.close()\n",
" \n",
"def process_cube(product_id, language=\"en\"):\n",
" extract_zipfile(product_id, language)\n",
" \"\"\"\n",
" The pandas column reader is better than the Polars one\n",
" Here is an example where polars was not reading it right:\n",
" https://www150.statcan.gc.ca/n1/tbl/csv/98100404-eng.zip\n",
" \"\"\"\n",
" # Get metadata\n",
" metadata_file = f\"{input_folder}/metadata/{product_id}.json\"\n",
" metadata = get_cube_metadata(product_id)\n",
" print(f\"Writing metadata file {metadata_file}\")\n",
" with open(metadata_file, \"w\") as outfile:\n",
" json.dump(metadata, outfile)\n",
" update_last_downloaded(product_id)\n",
" # Read CSV using Pandas\n",
" product_csv = f\"{scratch_folder}/{product_id}.csv\"\n",
" parameters = {\n",
" \"engine\": \"c\",\n",
" \"low_memory\": True\n",
" }\n",
" print(f\"Reading {product_csv} as a Pandas dataframe\")\n",
" df = pd.read_csv(product_csv, **parameters)\n",
" df = convert_to_lowest_type(df)\n",
" print(\"Import Pandas dataframe as a Polars dataframe\")\n",
" df = pl.from_pandas(df)\n",
" output_parquet = f\"{output_folder}/{language}/{product_id}.parquet\"\n",
" print(f\"Exporting dataframe as parquet to {output_parquet}\")\n",
" df.write_parquet(f\"{output_folder}/{language}/{product_id}.parquet\",\n",
" compression='zstd',\n",
" compression_level=22)\n",
" # Remove the scratch files\n",
" print(\"Removing scratch files\")\n",
" os.remove(f\"{scratch_folder}/{product_id}.csv\")\n",
" os.remove(f\"{scratch_folder}/{product_id}_MetaData.csv\") \n"
]
},
{
"cell_type": "code",
"execution_count": 164,
"id": "6be0842e-c1e0-45b4-90e5-0be28b963aed",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Extracting /data/tables/input/en/10100111.zip to /data/tables/scratch\n",
"Writing metadata file /data/tables/input/metadata/10100111.json\n",
"Reading metadata /data/tables/input/metadata/10100111.json\n",
"('10100111', '2025-06-18T12:30:00+00:00')\n",
"Reading /data/tables/scratch/10100111.csv as a Pandas dataframe\n",
"Converting dataframe to optimal data types\n",
"Import Pandas dataframe as a Polars dataframe\n",
"Exporting dataframe as parquet to /data/tables/output/en/10100111.parquet\n",
"Removing scratch files\n"
]
}
],
"source": [
"process_cube(\"10100111\")"
]
},
{
"cell_type": "code",
"execution_count": 194,
"id": "8a193195-bfa6-4df3-adab-0f88025276da",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Downloading https://www150.statcan.gc.ca/n1/tbl/csv/38100164-eng.zip to /data/tables/input/en/38100164.zip\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"/data/tables/input/en/38100164.zip: 100%|██████████████████| 980k/980k [00:00<00:00, 1.27MB/s]\n"
]
}
],
"source": [
"download_cube(\"38100164\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "fc4ae317-3e3e-4c27-a2fe-7e6bd12f673f",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.3"
}
},
"nbformat": 4,
"nbformat_minor": 5
}