diff --git a/experiments/statcan_products/process_product_test.ipynb b/experiments/statcan_products/process_product_test.ipynb new file mode 100644 index 0000000..089b9e6 --- /dev/null +++ b/experiments/statcan_products/process_product_test.ipynb @@ -0,0 +1,507 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "8b767621-b96b-4eaa-a16a-0329bab29c0f", + "metadata": {}, + "source": [ + "# Notes\n", + "- Make sure Statistics Canada knows about these issues:\n", + " - When downloading the XML for product_id 98100404 it just returns the structure document, not the data document\n", + " - The releaseTime value is in Eastern Standard Zone for https://www150.statcan.gc.ca/t1/wds/rest/getCubeMetadata and UTC for https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite\n", + " - The releaseTime value is different when getting it from https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite and https://www150.statcan.gc.ca/t1/wds/rest/getCubeMetadata . For example productId 10100007" + ] + }, + { + "cell_type": "code", + "execution_count": 188, + "id": "98859cd6-6fa4-4aef-a113-455699524fae", + "metadata": {}, + "outputs": [], + "source": [ + "from datetime import datetime\n", + "import glob\n", + "from multiprocessing import Pool\n", + "import json\n", + "import os\n", + "import sqlite3\n", + "import zipfile\n", + "from zoneinfo import ZoneInfo\n", + "\n", + "import pandas as pd\n", + "import polars as pl\n", + "import requests\n", + "from tqdm import tqdm\n", + "\n", + "data_folder = \"/data/tables\"\n", + "input_folder = f\"{data_folder}/input\"\n", + "scratch_folder = f\"{data_folder}/scratch\"\n", + "output_folder = f\"{data_folder}/output\"\n", + "\n", + "\n", + "if not os.path.exists(\"processing.db\"):\n", + " con = sqlite3.connect('processing.db')\n", + " cur = con.cursor()\n", + " cur.executescript(\"\"\"\n", + " CREATE TABLE IF NOT EXISTS downloaded (\n", + " product_id TEXT PRIMARY KEY,\n", + " last_updated TEXT\n", + " );\n", + "\n", + " CREATE TABLE IF NOT EXISTS cubes (\n", + " product_id TEXT PRIMARY KEY,\n", + " last_updated TEXT\n", + " );\n", + " \"\"\")\n", + " con.commit()\n", + "else:\n", + " con = sqlite3.connect('processing.db')\n", + " cur = con.cursor()" + ] + }, + { + "cell_type": "code", + "execution_count": 96, + "id": "28ac4c01-c1c5-427f-bb2c-0da99a4c5591", + "metadata": {}, + "outputs": [], + "source": [ + "def setup():\n", + " \"\"\"\n", + " Makes data folders\n", + " \"\"\"\n", + " folders_to_create = [data_folder, input_folder, \n", + " scratch_folder, output_folder,\n", + " f\"{input_folder}/en\", f\"{output_folder}/en\",\n", + " f\"{input_folder}/fr\", f\"{output_folder}/fr\",\n", + " f\"{input_folder}/metadata\"]\n", + " for folder in folders_to_create:\n", + " if not os.path.exists(folder):\n", + " print(f\"Making folder {folder}\")\n", + " os.mkdir(folder)" + ] + }, + { + "cell_type": "code", + "execution_count": 97, + "id": "9daa94f4-16c9-4d8b-951e-3d5d38eb618f", + "metadata": {}, + "outputs": [], + "source": [ + "setup()" + ] + }, + { + "cell_type": "code", + "execution_count": 177, + "id": "0af9a4b3-7b59-460b-b933-504919d4bd2a", + "metadata": {}, + "outputs": [], + "source": [ + "def update_last_downloaded(product_id):\n", + " filepath = f\"{input_folder}/metadata/{product_id}.json\"\n", + " print(f\"Reading metadata {filepath}\")\n", + " with open(filepath, 'r') as fp:\n", + " metadata = json.load(fp)\n", + " product_id = metadata.get(\"object\").get(\"productId\")\n", + " last_updated = metadata.get(\"object\").get(\"releaseTime\")\n", + " # Convert last_updated to UTC since /getAllcubesListLite uses UTC\n", + " last_updated = datetime.strptime(last_updated, \"%Y-%m-%dT%H:%M\")\n", + " last_updated = last_updated.replace(tzinfo=ZoneInfo(\"America/Toronto\"))\n", + " last_updated = last_updated.astimezone(ZoneInfo(\"UTC\")).isoformat()\n", + " \n", + " data = (product_id, last_updated)\n", + " cur.execute(\"SELECT product_id FROM downloaded WHERE product_id = ?\", (product_id,))\n", + " result = cur.fetchone()\n", + " if not result:\n", + " cur.execute(\"INSERT INTO downloaded VALUES (?, ?)\", data)\n", + " else:\n", + " cur.execute(\"UPDATE downloaded SET last_updated = ? WHERE product_id = ?\", (last_updated, product_id))\n", + "\n", + " con.commit()" + ] + }, + { + "cell_type": "code", + "execution_count": 170, + "id": "23cbabc3-0d4b-4e28-a4df-b2c5e8c7ea8b", + "metadata": {}, + "outputs": [], + "source": [ + "def update_tables():\n", + " \"\"\"\n", + " This currently does not work as expected because Statistics Canada has discrepancies.\n", + " The \"releaseTime\" listed in https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite\n", + " for every pdocutId is not the same as \"releaseTime\" listed when making a POST \n", + " https://www150.statcan.gc.ca/t1/wds/rest/getCubeMetadata , for example:\n", + " [{\"productId\":10100007}]\n", + " \"\"\"\n", + " cur.execute(\"\"\"\n", + " DELETE FROM cubes;\n", + " \"\"\")\n", + " con.commit()\n", + " response = requests.get(\"https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite\").json()\n", + " cubes_metadata = pl.from_dicts(response)[['productId', 'releaseTime']]\n", + " cubes_metadata = cubes_metadata.rename({\"productId\": \"product_id\", \"releaseTime\": \"last_updated\"})\n", + " cubes_metadata = cubes_metadata.rows()\n", + " cubes_metadata_new = []\n", + " for cube in cubes_metadata:\n", + " product_id, last_updated = cube\n", + " # Update the date field so it is formatted the same as date field in downloaded table\n", + " last_updated = datetime.strptime(last_updated, \"%Y-%m-%dT%H:%M:%SZ\").astimezone(ZoneInfo(\"UTC\"))\n", + " last_updated = last_updated.isoformat()\n", + " cubes_metadata_new.append((product_id, last_updated))\n", + " \n", + " cur.executemany(\"INSERT INTO cubes VALUES(?, ?)\", cubes_metadata_new)\n", + " con.commit()\n", + "\n", + " cur.execute(\"\"\"\n", + " SELECT a.product_id\n", + " FROM downloaded AS a,\n", + " cubes AS b\n", + " WHERE a.product_id = b.product_id\n", + " AND b.last_updated > a.last_updated\n", + " \"\"\")\n", + " results = cur.fetchall()\n", + " for result in results:\n", + " product_id = result[0]\n", + " print(f\"Updating product_id: {product_id}\")\n", + " process_cube(product_id)" + ] + }, + { + "cell_type": "code", + "execution_count": 186, + "id": "c0d46f38-242c-4685-b8b6-4e046c23aec5", + "metadata": {}, + "outputs": [], + "source": [ + "cur.execute(\"\"\"\n", + " SELECT a.product_id, a.last_updated, b.last_updated\n", + " FROM downloaded AS a,\n", + " cubes AS b\n", + " WHERE a.product_id = b.product_id\n", + " AND b.last_updated > a.last_updated\n", + "\"\"\")\n", + "difference = pd.DataFrame(cur.fetchall(), columns=[\"product_id\", \"release_time_metadata\", \"release_time_cubelist\"])" + ] + }, + { + "cell_type": "code", + "execution_count": 189, + "id": "a5e12a7a-9891-4d58-9368-e31b274feb1d", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
product_idrelease_time_metadatarelease_time_cubelist
0101000062025-05-21T12:30:00+00:002025-06-17T12:30:00+00:00
1101001082025-05-13T12:30:00+00:002025-06-12T12:30:00+00:00
2101001322025-06-06T12:30:00+00:002025-06-13T12:30:00+00:00
3101001362025-06-10T12:30:00+00:002025-06-16T12:30:00+00:00
4101001382025-06-06T12:30:00+00:002025-06-13T12:30:00+00:00
............
92381002352025-03-13T12:30:00+00:002025-06-12T12:30:00+00:00
93381002362025-03-13T12:30:00+00:002025-06-12T12:30:00+00:00
94381002372025-03-13T12:30:00+00:002025-06-12T12:30:00+00:00
95381002382025-03-13T12:30:00+00:002025-06-12T12:30:00+00:00
96381001642023-09-13T12:30:00+00:002025-06-17T12:30:00+00:00
\n", + "

97 rows × 3 columns

\n", + "
" + ], + "text/plain": [ + " product_id release_time_metadata release_time_cubelist\n", + "0 10100006 2025-05-21T12:30:00+00:00 2025-06-17T12:30:00+00:00\n", + "1 10100108 2025-05-13T12:30:00+00:00 2025-06-12T12:30:00+00:00\n", + "2 10100132 2025-06-06T12:30:00+00:00 2025-06-13T12:30:00+00:00\n", + "3 10100136 2025-06-10T12:30:00+00:00 2025-06-16T12:30:00+00:00\n", + "4 10100138 2025-06-06T12:30:00+00:00 2025-06-13T12:30:00+00:00\n", + ".. ... ... ...\n", + "92 38100235 2025-03-13T12:30:00+00:00 2025-06-12T12:30:00+00:00\n", + "93 38100236 2025-03-13T12:30:00+00:00 2025-06-12T12:30:00+00:00\n", + "94 38100237 2025-03-13T12:30:00+00:00 2025-06-12T12:30:00+00:00\n", + "95 38100238 2025-03-13T12:30:00+00:00 2025-06-12T12:30:00+00:00\n", + "96 38100164 2023-09-13T12:30:00+00:00 2025-06-17T12:30:00+00:00\n", + "\n", + "[97 rows x 3 columns]" + ] + }, + "execution_count": 189, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "difference" + ] + }, + { + "cell_type": "code", + "execution_count": 193, + "id": "144e3716-b0e7-4a39-9a25-ededea506f4f", + "metadata": {}, + "outputs": [], + "source": [ + "def convert_to_lowest_type(df):\n", + " \"\"\"\n", + " Convert columns to the best possible dtypes\n", + " For example, if the column is numerical and has a maximum value of 32,000 \n", + " we can assign it a type of int16\n", + " \"\"\"\n", + " print(\"Converting dataframe to optimal data types\")\n", + " params = {\n", + " 'convert_string': False,\n", + " 'convert_boolean': False\n", + " }\n", + " df = df.convert_dtypes(**params)\n", + "\n", + " dtypes = pd.DataFrame(df.dtypes)\n", + " # Downcast to the smallest numerical dtype\n", + " for row in dtypes.itertuples():\n", + " column = row[0]\n", + " the_type = str(row[1])\n", + " # Skipping downcasting Float64 as there were issues with decimal places\n", + " # For example, instead of a value being 65.4, it turned into 65.4000015258789\n", + " if the_type == 'Float64':\n", + " continue\n", + " elif the_type == 'Int64':\n", + " df[column] = pd.to_numeric(df[column], downcast='integer')\n", + "\n", + " return df\n", + "\n", + "def extract_zipfile(product_id, language):\n", + " \"\"\"\n", + " It is faster to extract the zip file and read the CSV, than open\n", + " via zipfile and then Pandas\n", + " \"\"\"\n", + " zip_file = f\"{input_folder}/{language}/{product_id}.zip\"\n", + " with zipfile.ZipFile(zip_file) as myzip:\n", + " print(f\"Extracting {zip_file} to {scratch_folder}\")\n", + " myzip.extractall(path=scratch_folder)\n", + "\n", + "def get_cube_metadata(product_id):\n", + " url = f\"https://www150.statcan.gc.ca/t1/wds/rest/getCubeMetadata\"\n", + " cubes_payload = [{\"productId\": product_id}]\n", + " result = requests.post(url, json=cubes_payload)\n", + " result = result.json()[0]\n", + " return result\n", + "\n", + "def download_cube(product_id, language=\"en\"):\n", + " download_url = f\"https://www150.statcan.gc.ca/t1/wds/rest/getFullTableDownloadCSV/{product_id}/en\"\n", + " response = requests.get(download_url).json()\n", + " zip_url = response['object']\n", + " zip_file_name = f\"{input_folder}/{language}/{product_id}.zip\"\n", + " print(f\"Downloading {zip_url} to {zip_file_name}\")\n", + " response = requests.get(zip_url, stream=True, headers={\"user-agent\": None})\n", + " progress_bar = tqdm(\n", + " desc=zip_file_name,\n", + " total=int(response.headers.get(\"content-length\", 0)),\n", + " unit=\"B\",\n", + " unit_scale=True\n", + " )\n", + " with open(zip_file_name, \"wb\") as handle:\n", + " for chunk in response.iter_content(chunk_size=512):\n", + " if chunk: # filter out keep-alive new chunks\n", + " handle.write(chunk)\n", + " progress_bar.update(len(chunk))\n", + " progress_bar.close()\n", + " \n", + "def process_cube(product_id, language=\"en\"):\n", + " extract_zipfile(product_id, language)\n", + " \"\"\"\n", + " The pandas column reader is better than the Polars one\n", + " Here is an example where polars was not reading it right:\n", + " https://www150.statcan.gc.ca/n1/tbl/csv/98100404-eng.zip\n", + " \"\"\"\n", + " # Get metadata\n", + " metadata_file = f\"{input_folder}/metadata/{product_id}.json\"\n", + " metadata = get_cube_metadata(product_id)\n", + " print(f\"Writing metadata file {metadata_file}\")\n", + " with open(metadata_file, \"w\") as outfile:\n", + " json.dump(metadata, outfile)\n", + " update_last_downloaded(product_id)\n", + " # Read CSV using Pandas\n", + " product_csv = f\"{scratch_folder}/{product_id}.csv\"\n", + " parameters = {\n", + " \"engine\": \"c\",\n", + " \"low_memory\": True\n", + " }\n", + " print(f\"Reading {product_csv} as a Pandas dataframe\")\n", + " df = pd.read_csv(product_csv, **parameters)\n", + " df = convert_to_lowest_type(df)\n", + " print(\"Import Pandas dataframe as a Polars dataframe\")\n", + " df = pl.from_pandas(df)\n", + " output_parquet = f\"{output_folder}/{language}/{product_id}.parquet\"\n", + " print(f\"Exporting dataframe as parquet to {output_parquet}\")\n", + " df.write_parquet(f\"{output_folder}/{language}/{product_id}.parquet\",\n", + " compression='zstd',\n", + " compression_level=22)\n", + " # Remove the scratch files\n", + " print(\"Removing scratch files\")\n", + " os.remove(f\"{scratch_folder}/{product_id}.csv\")\n", + " os.remove(f\"{scratch_folder}/{product_id}_MetaData.csv\") \n" + ] + }, + { + "cell_type": "code", + "execution_count": 164, + "id": "6be0842e-c1e0-45b4-90e5-0be28b963aed", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Extracting /data/tables/input/en/10100111.zip to /data/tables/scratch\n", + "Writing metadata file /data/tables/input/metadata/10100111.json\n", + "Reading metadata /data/tables/input/metadata/10100111.json\n", + "('10100111', '2025-06-18T12:30:00+00:00')\n", + "Reading /data/tables/scratch/10100111.csv as a Pandas dataframe\n", + "Converting dataframe to optimal data types\n", + "Import Pandas dataframe as a Polars dataframe\n", + "Exporting dataframe as parquet to /data/tables/output/en/10100111.parquet\n", + "Removing scratch files\n" + ] + } + ], + "source": [ + "process_cube(\"10100111\")" + ] + }, + { + "cell_type": "code", + "execution_count": 194, + "id": "8a193195-bfa6-4df3-adab-0f88025276da", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Downloading https://www150.statcan.gc.ca/n1/tbl/csv/38100164-eng.zip to /data/tables/input/en/38100164.zip\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/data/tables/input/en/38100164.zip: 100%|██████████████████| 980k/980k [00:00<00:00, 1.27MB/s]\n" + ] + } + ], + "source": [ + "download_cube(\"38100164\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fc4ae317-3e3e-4c27-a2fe-7e6bd12f673f", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.3" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}