Had to optimize the code. Leaving it outside of function for now in case I need to continue working on it

This commit is contained in:
Diego Ripley
2025-06-20 16:00:51 -04:00
parent f6d88c5fd0
commit e836363cd1
@@ -3,20 +3,37 @@
{
"cell_type": "markdown",
"id": "8b767621-b96b-4eaa-a16a-0329bab29c0f",
"metadata": {},
"metadata": {
"editable": true,
"slideshow": {
"slide_type": ""
},
"tags": []
},
"source": [
"# Notes\n",
"- Make sure Statistics Canada knows about these issues:\n",
" - When downloading the XML for product_id 98100404 it just returns the structure document, not the data document\n",
" - The releaseTime value is in Eastern Standard Zone for https://www150.statcan.gc.ca/t1/wds/rest/getCubeMetadata and UTC for https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite\n",
" - The releaseTime value is different when getting it from https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite and https://www150.statcan.gc.ca/t1/wds/rest/getCubeMetadata . For example productId 10100007"
" - The releaseTime value is different when getting it from https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite and https://www150.statcan.gc.ca/t1/wds/rest/getCubeMetadata . For example productId 10100007\n",
" - Currently having an issue with processing productId 13100442. Expected 18 fields but saw 19 at line 162\n",
" - There are cases where the `REF_DATE` is a range, ex. 2023/2024\n",
" - For productId 17100022 the period is from July 1 to June 30 (seen in the metadata), so can't just use January 1, 2023 and December 31, 2024\n",
" - There can be times when a table's `VALUE` field can have both float and integer data types. For example, productId 11100025 has `DECIMAL` values of 0 and 1, corresponding to integer and float data types\n",
" - Need to look into cases where there is no `DGUID`, but the `GEO` value tells you something like `All census metropolitan areas` (productId 11100025). I guess I can do the linking to the CMAs on the API side"
]
},
{
"cell_type": "code",
"execution_count": 209,
"execution_count": 1,
"id": "98859cd6-6fa4-4aef-a113-455699524fae",
"metadata": {},
"metadata": {
"editable": true,
"slideshow": {
"slide_type": ""
},
"tags": []
},
"outputs": [],
"source": [
"from datetime import datetime\n",
@@ -29,7 +46,6 @@
"from zoneinfo import ZoneInfo\n",
"\n",
"import pandas as pd\n",
"import polars as pl\n",
"import requests\n",
"from tqdm import tqdm\n",
"\n",
@@ -38,14 +54,14 @@
"scratch_folder = f\"{data_folder}/scratch\"\n",
"output_folder = f\"{data_folder}/output\"\n",
"\n",
"\n",
"if not os.path.exists(f\"{data_folder}/processing.db\"):\n",
" con = sqlite3.connect(f\"{data_folder}/processing.db\")\n",
" cur = con.cursor()\n",
" cur.executescript(\"\"\"\n",
" CREATE TABLE IF NOT EXISTS downloaded (\n",
" product_id TEXT PRIMARY KEY,\n",
" last_updated TEXT\n",
" last_updated TEXT,\n",
" last_processed TEXT\n",
" );\n",
"\n",
" CREATE TABLE IF NOT EXISTS cubes (\n",
@@ -61,7 +77,7 @@
},
{
"cell_type": "code",
"execution_count": 210,
"execution_count": 2,
"id": "28ac4c01-c1c5-427f-bb2c-0da99a4c5591",
"metadata": {},
"outputs": [],
@@ -83,7 +99,7 @@
},
{
"cell_type": "code",
"execution_count": 211,
"execution_count": 3,
"id": "9daa94f4-16c9-4d8b-951e-3d5d38eb618f",
"metadata": {},
"outputs": [],
@@ -93,9 +109,15 @@
},
{
"cell_type": "code",
"execution_count": 212,
"execution_count": 4,
"id": "0af9a4b3-7b59-460b-b933-504919d4bd2a",
"metadata": {},
"metadata": {
"editable": true,
"slideshow": {
"slide_type": ""
},
"tags": []
},
"outputs": [],
"source": [
"def update_last_downloaded(product_id):\n",
@@ -128,7 +150,7 @@
},
{
"cell_type": "code",
"execution_count": 213,
"execution_count": 5,
"id": "4b7996d2-75ab-4173-a17a-64fb7ab63740",
"metadata": {},
"outputs": [],
@@ -141,7 +163,7 @@
},
{
"cell_type": "code",
"execution_count": 214,
"execution_count": 6,
"id": "23cbabc3-0d4b-4e28-a4df-b2c5e8c7ea8b",
"metadata": {},
"outputs": [],
@@ -190,7 +212,7 @@
},
{
"cell_type": "code",
"execution_count": 215,
"execution_count": 7,
"id": "dc5573ef-734b-44d8-a4c4-0df19d655975",
"metadata": {},
"outputs": [],
@@ -200,8 +222,66 @@
},
{
"cell_type": "code",
"execution_count": 228,
"id": "144e3716-b0e7-4a39-9a25-ededea506f4f",
"execution_count": null,
"id": "eddf6501-8428-44cc-8d2d-e245803a3943",
"metadata": {
"editable": true,
"slideshow": {
"slide_type": ""
},
"tags": []
},
"outputs": [],
"source": [
"def compute_ref_date_bounds(df):\n",
" \"\"\"\n",
" TODO: There are cases where the REF_DATE is a range, ex. 2023/2024.\n",
" For productId 17100022 the period is from July 1 to June 30 (seen in the metadata), so can't just \n",
" use January 1, 2023 and December 31, 2024\n",
" \"\"\"\n",
" series = df[\"REF_DATE\"]\n",
"\n",
" # Initialize the two new columns with NaT\n",
" df[\"REF_START_DATE\"] = pd.NaT\n",
" df[\"REF_END_DATE\"] = pd.NaT\n",
"\n",
" # Skip rows that contain slashes\n",
" valid_mask = ~series.str.contains(\"/\", na=False)\n",
"\n",
" # Case 1: YYYY-MM-DD\n",
" full_mask = valid_mask & series.str.fullmatch(r\"\\d{4}-\\d{2}-\\d{2}\")\n",
" parsed_full = pd.to_datetime(series[full_mask], format=\"%Y-%m-%d\", errors=\"coerce\")\n",
" df.loc[full_mask, \"REF_START_DATE\"] = parsed_full\n",
" df.loc[full_mask, \"REF_END_DATE\"] = parsed_full\n",
"\n",
" # Case 2: YYYY-MM\n",
" month_mask = valid_mask & series.str.fullmatch(r\"\\d{4}-\\d{2}\")\n",
" parsed_month = pd.to_datetime(series[month_mask], format=\"%Y-%m\", errors=\"coerce\")\n",
" df.loc[month_mask, \"REF_START_DATE\"] = parsed_month\n",
" df.loc[month_mask, \"REF_END_DATE\"] = parsed_month + pd.to_timedelta(\n",
" parsed_month.dt.days_in_month - 1, unit='D'\n",
" )\n",
"\n",
" # Case 3: YYYY\n",
" year_mask = valid_mask & series.str.fullmatch(r\"\\d{4}\")\n",
" parsed_year = pd.to_datetime(series[year_mask], format=\"%Y\", errors=\"coerce\")\n",
" df.loc[year_mask, \"REF_START_DATE\"] = parsed_year\n",
" df.loc[year_mask, \"REF_END_DATE\"] = parsed_year + pd.offsets.YearEnd(0)\n",
"\n",
" # Move columns after REF_DATE\n",
" ref_idx = df.columns.get_loc(\"REF_DATE\")\n",
" cols = list(df.columns)\n",
" cols.remove(\"REF_START_DATE\")\n",
" cols.remove(\"REF_END_DATE\")\n",
" cols[ref_idx + 1:ref_idx + 1] = [\"REF_START_DATE\", \"REF_END_DATE\"]\n",
"\n",
" return df[cols]"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "e67642d3-cc6c-4c5d-b5a3-2fe18364ad71",
"metadata": {},
"outputs": [],
"source": [
@@ -211,27 +291,24 @@
" For example, if the column is numerical and has a maximum value of 32,000 \n",
" we can assign it a type of int16\n",
" \"\"\"\n",
" print(\"Converting dataframe to optimal data types\")\n",
" params = {\n",
" 'convert_string': False,\n",
" 'convert_boolean': False\n",
" }\n",
" df = df.convert_dtypes(**params)\n",
"\n",
" dtypes = pd.DataFrame(df.dtypes)\n",
" # Downcast to the smallest numerical dtype\n",
" for row in dtypes.itertuples():\n",
" column = row[0]\n",
" the_type = str(row[1])\n",
" # Skipping downcasting Float64 as there were issues with decimal places\n",
" # For example, instead of a value being 65.4, it turned into 65.4000015258789\n",
" if the_type == 'Float64':\n",
" continue\n",
" elif the_type == 'Int64':\n",
" if the_type == 'int64':\n",
" df[column] = pd.to_numeric(df[column], downcast='integer')\n",
"\n",
" return df\n",
"\n",
" return df"
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "144e3716-b0e7-4a39-9a25-ededea506f4f",
"metadata": {},
"outputs": [],
"source": [
"def extract_zipfile(product_id, language):\n",
" \"\"\"\n",
" It is faster to extract the zip file and read the CSV, than open\n",
@@ -270,74 +347,180 @@
" if chunk: # filter out keep-alive new chunks\n",
" handle.write(chunk)\n",
" progress_bar.update(len(chunk))\n",
" progress_bar.close()\n",
" \n",
"def process_cube(product_id, language=\"en\"):\n",
" cur.execute(\"SELECT product_id FROM downloaded WHERE product_id = ?\", (product_id,))\n",
" result = cur.fetchone()\n",
" if result:\n",
" print(f\"Already processed {product_id}\")\n",
" return\n",
" extract_zipfile(product_id, language)\n",
" \"\"\"\n",
" The pandas column reader is better than the Polars one\n",
" Here is an example where polars was not reading it right:\n",
" https://www150.statcan.gc.ca/n1/tbl/csv/98100404-eng.zip\n",
" \"\"\"\n",
" # Get metadata\n",
" #metadata_file = f\"{input_folder}/metadata/{product_id}.json\"\n",
" #metadata = get_cube_metadata(product_id)\n",
" #print(f\"Writing metadata file {metadata_file}\")\n",
" #with open(metadata_file, \"w\") as outfile:\n",
" # json.dump(metadata, outfile)\n",
" # Read CSV using Pandas\n",
" product_csv = f\"{scratch_folder}/{product_id}.csv\"\n",
" parameters = {\n",
" \"engine\": \"c\",\n",
" \"low_memory\": True\n",
" }\n",
" print(f\"Reading {product_csv} as a Pandas dataframe\")\n",
" df = pd.read_csv(product_csv, **parameters)\n",
" df = convert_to_lowest_type(df)\n",
" # Fix mixed types. This happened in productId 11100147\n",
" for col in df.select_dtypes(include=[\"object\"]).columns:\n",
" types_in_col = df[col].map(type).unique()\n",
" # If there's any non-str type in the column\n",
" if any(t != str for t in types_in_col):\n",
" print(f\"Fixing mixed types in column: {col} — types found: {types_in_col}\")\n",
" # Replace non-str values with None\n",
" df[col] = df[col].where(df[col].map(type) == str, None).astype(\"string\")\n",
" print(\"Import Pandas dataframe as a Polars dataframe\")\n",
" df = pl.from_pandas(df)\n",
" output_parquet = f\"{output_folder}/{language}/{product_id}.parquet\"\n",
" print(f\"Exporting dataframe as parquet to {output_parquet}\")\n",
" df.write_parquet(output_parquet,\n",
" compression='zstd',\n",
" compression_level=22)\n",
" # Remove the scratch files\n",
" print(\"Removing scratch files\")\n",
" os.remove(f\"{scratch_folder}/{product_id}.csv\")\n",
" os.remove(f\"{scratch_folder}/{product_id}_MetaData.csv\")\n",
" update_last_downloaded(product_id)\n",
" update_last_processed(product_id)"
" progress_bar.close()"
]
},
{
"cell_type": "code",
"execution_count": 229,
"id": "6be0842e-c1e0-45b4-90e5-0be28b963aed",
"metadata": {},
"execution_count": 12,
"id": "858e405e-7c02-4193-8abe-f23951761b09",
"metadata": {
"editable": true,
"slideshow": {
"slide_type": ""
},
"tags": []
},
"outputs": [],
"source": [
"product_id = \"43100011\"\n",
"#def process_cube(product_id, language=\"en\"):\n",
"language = \"en\"\n",
"cur.execute(\"SELECT product_id FROM downloaded WHERE product_id = ?\", (product_id,))\n",
"result = cur.fetchone()\n",
"if result:\n",
" print(f\"Already processed {product_id}\")\n",
" #return\n",
"#extract_zipfile(product_id, language)\n",
"\"\"\"\n",
"The pandas column reader is better than the Polars one\n",
"Here is an example where polars was not reading it right:\n",
"https://www150.statcan.gc.ca/n1/tbl/csv/98100404-eng.zip\n",
"\"\"\"\n",
"# Get metadata\n",
"#metadata_file = f\"{input_folder}/metadata/{product_id}.json\"\n",
"#metadata = get_cube_metadata(product_id)\n",
"#print(f\"Writing metadata file {metadata_file}\")\n",
"#with open(metadata_file, \"w\") as outfile:\n",
"# json.dump(metadata, outfile)\n",
"# Read CSV using Pandas\n",
"product_csv = f\"{scratch_folder}/{product_id}.csv\"\n",
"parameters = {\n",
" \"engine\": \"c\",\n",
" \"low_memory\": True,\n",
" #\"nrows\": 1000000,\n",
" \"dtype\": {}\n",
"}\n",
"columns = pd.read_csv(product_csv, nrows=0).columns\n",
"\n",
"columns_always_int_8 = [\"DECIMALS\", \"SCALAR_ID\"]\n",
"for column in columns_always_int_8:\n",
" if column in columns:\n",
" parameters[\"dtype\"][column] = 'int8'\n",
"\n",
"columns_always_int_16 = [\"UOM_ID\"]\n",
"for column in columns_always_int_16:\n",
" if column in columns:\n",
" parameters[\"dtype\"][column] = 'int16'\n",
"\n",
"for column in columns:\n",
" if column not in columns_always_int_8 and column not in columns_always_int_16 and column != \"VALUE\":\n",
" parameters[\"dtype\"][column] = 'string'\n",
"\n",
"if not parameters[\"dtype\"]:\n",
" del parameters[\"dtype\"]\n",
"\n",
"print(f\"Reading {product_csv} as a Pandas dataframe\")\n",
"df = pd.read_csv(product_csv, **parameters)\n",
"\n",
"unique_decimal_values = len(df[\"DECIMALS\"].unique())\n",
"if unique_decimal_values > 1:\n",
" \"\"\"\n",
" Example of when you can have a table with DECIMALS = 0 and DECIMALS = 1, productId 11100025\n",
" \"\"\"\n",
" # Turn to float if not already\n",
" convert_dict = {\"VALUE\": \"float64\"}\n",
" df = df.astype(convert_dict)\n",
"elif unique_decimal_values == 1:\n",
" if df[\"VALUE\"].dtype != \"int64\":\n",
" # Turn to int64 if not already\n",
" convert_dict = {\"VALUE\": \"int64\"}\n",
" df = df.astype(convert_dict)\n",
"\n",
"df = convert_to_lowest_type(df)\n",
"df = compute_ref_date_bounds(df)"
]
},
{
"cell_type": "code",
"execution_count": 22,
"id": "49cc1fa3-1ac8-4510-b4fd-7827a041e4a9",
"metadata": {
"editable": true,
"slideshow": {
"slide_type": ""
},
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Already processed 11100147\n"
"Exporting dataframe as parquet to /data/tables/output/en/43100011.parquet\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"IOStream.flush timed out\n"
]
}
],
"source": [
"process_cube(\"11100147\")"
"output_parquet = f\"{output_folder}/{language}/{product_id}.parquet\"\n",
"print(f\"Exporting dataframe as parquet to {output_parquet}\")\n",
"parameters = {\n",
" \"path\": output_parquet,\n",
" \"engine\": \"pyarrow\",\n",
" \"compression\": \"zstd\",\n",
" \"index\": False,\n",
" \"compression_level\": 22\n",
"}\n",
"df.to_parquet(**parameters)"
]
},
{
"cell_type": "code",
"execution_count": 26,
"id": "788bc668-8057-4e06-91a3-b99991e0a410",
"metadata": {
"editable": true,
"slideshow": {
"slide_type": ""
},
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Removing scratch files\n",
"Reading metadata /data/tables/input/metadata/43100011.json\n"
]
},
{
"ename": "OperationalError",
"evalue": "no such column: last_processed",
"output_type": "error",
"traceback": [
"\u001b[31m---------------------------------------------------------------------------\u001b[39m",
"\u001b[31mOperationalError\u001b[39m Traceback (most recent call last)",
"\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[26]\u001b[39m\u001b[32m, line 6\u001b[39m\n\u001b[32m 4\u001b[39m os.remove(\u001b[33mf\u001b[39m\u001b[33m\"\u001b[39m\u001b[38;5;132;01m{\u001b[39;00mscratch_folder\u001b[38;5;132;01m}\u001b[39;00m\u001b[33m/\u001b[39m\u001b[38;5;132;01m{\u001b[39;00mproduct_id\u001b[38;5;132;01m}\u001b[39;00m\u001b[33m_MetaData.csv\u001b[39m\u001b[33m\"\u001b[39m)\n\u001b[32m 5\u001b[39m update_last_downloaded(product_id)\n\u001b[32m----> \u001b[39m\u001b[32m6\u001b[39m \u001b[43mupdate_last_processed\u001b[49m\u001b[43m(\u001b[49m\u001b[43mproduct_id\u001b[49m\u001b[43m)\u001b[49m\n",
"\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[5]\u001b[39m\u001b[32m, line 3\u001b[39m, in \u001b[36mupdate_last_processed\u001b[39m\u001b[34m(product_id)\u001b[39m\n\u001b[32m 1\u001b[39m \u001b[38;5;28;01mdef\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34mupdate_last_processed\u001b[39m(product_id):\n\u001b[32m 2\u001b[39m time_finished_processing = datetime.now().isoformat()\n\u001b[32m----> \u001b[39m\u001b[32m3\u001b[39m \u001b[43mcur\u001b[49m\u001b[43m.\u001b[49m\u001b[43mexecute\u001b[49m\u001b[43m(\u001b[49m\u001b[33;43m\"\u001b[39;49m\u001b[33;43mUPDATE downloaded SET last_processed = ? WHERE product_id = ?\u001b[39;49m\u001b[33;43m\"\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43m(\u001b[49m\u001b[43mtime_finished_processing\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mproduct_id\u001b[49m\u001b[43m)\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 4\u001b[39m con.commit()\n",
"\u001b[31mOperationalError\u001b[39m: no such column: last_processed"
]
}
],
"source": [
"# Remove the scratch files\n",
"print(\"Removing scratch files\")\n",
"os.remove(f\"{scratch_folder}/{product_id}.csv\")\n",
"os.remove(f\"{scratch_folder}/{product_id}_MetaData.csv\")\n",
"update_last_downloaded(product_id)\n",
"update_last_processed(product_id)"
]
},
{
"cell_type": "code",
"execution_count": 24,
"id": "06fb89ad-77ba-46db-bb88-15f5636d707d",
"metadata": {},
"outputs": [],
"source": [
"#process_cube(\"43100011\")"
]
}
],