diff --git a/experiments/statcan_products/process_product_test.ipynb b/experiments/statcan_products/process_product_test.ipynb index 9362329..1f5bdcf 100644 --- a/experiments/statcan_products/process_product_test.ipynb +++ b/experiments/statcan_products/process_product_test.ipynb @@ -3,20 +3,37 @@ { "cell_type": "markdown", "id": "8b767621-b96b-4eaa-a16a-0329bab29c0f", - "metadata": {}, + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, "source": [ "# Notes\n", "- Make sure Statistics Canada knows about these issues:\n", " - When downloading the XML for product_id 98100404 it just returns the structure document, not the data document\n", " - The releaseTime value is in Eastern Standard Zone for https://www150.statcan.gc.ca/t1/wds/rest/getCubeMetadata and UTC for https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite\n", - " - The releaseTime value is different when getting it from https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite and https://www150.statcan.gc.ca/t1/wds/rest/getCubeMetadata . For example productId 10100007" + " - The releaseTime value is different when getting it from https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite and https://www150.statcan.gc.ca/t1/wds/rest/getCubeMetadata . For example productId 10100007\n", + " - Currently having an issue with processing productId 13100442. Expected 18 fields but saw 19 at line 162\n", + " - There are cases where the `REF_DATE` is a range, ex. 2023/2024\n", + " - For productId 17100022 the period is from July 1 to June 30 (seen in the metadata), so can't just use January 1, 2023 and December 31, 2024\n", + " - There can be times when a table's `VALUE` field can have both float and integer data types. For example, productId 11100025 has `DECIMAL` values of 0 and 1, corresponding to integer and float data types\n", + " - Need to look into cases where there is no `DGUID`, but the `GEO` value tells you something like `All census metropolitan areas` (productId 11100025). I guess I can do the linking to the CMAs on the API side" ] }, { "cell_type": "code", - "execution_count": 209, + "execution_count": 1, "id": "98859cd6-6fa4-4aef-a113-455699524fae", - "metadata": {}, + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, "outputs": [], "source": [ "from datetime import datetime\n", @@ -29,7 +46,6 @@ "from zoneinfo import ZoneInfo\n", "\n", "import pandas as pd\n", - "import polars as pl\n", "import requests\n", "from tqdm import tqdm\n", "\n", @@ -38,14 +54,14 @@ "scratch_folder = f\"{data_folder}/scratch\"\n", "output_folder = f\"{data_folder}/output\"\n", "\n", - "\n", "if not os.path.exists(f\"{data_folder}/processing.db\"):\n", " con = sqlite3.connect(f\"{data_folder}/processing.db\")\n", " cur = con.cursor()\n", " cur.executescript(\"\"\"\n", " CREATE TABLE IF NOT EXISTS downloaded (\n", " product_id TEXT PRIMARY KEY,\n", - " last_updated TEXT\n", + " last_updated TEXT,\n", + " last_processed TEXT\n", " );\n", "\n", " CREATE TABLE IF NOT EXISTS cubes (\n", @@ -61,7 +77,7 @@ }, { "cell_type": "code", - "execution_count": 210, + "execution_count": 2, "id": "28ac4c01-c1c5-427f-bb2c-0da99a4c5591", "metadata": {}, "outputs": [], @@ -83,7 +99,7 @@ }, { "cell_type": "code", - "execution_count": 211, + "execution_count": 3, "id": "9daa94f4-16c9-4d8b-951e-3d5d38eb618f", "metadata": {}, "outputs": [], @@ -93,9 +109,15 @@ }, { "cell_type": "code", - "execution_count": 212, + "execution_count": 4, "id": "0af9a4b3-7b59-460b-b933-504919d4bd2a", - "metadata": {}, + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, "outputs": [], "source": [ "def update_last_downloaded(product_id):\n", @@ -128,7 +150,7 @@ }, { "cell_type": "code", - "execution_count": 213, + "execution_count": 5, "id": "4b7996d2-75ab-4173-a17a-64fb7ab63740", "metadata": {}, "outputs": [], @@ -141,7 +163,7 @@ }, { "cell_type": "code", - "execution_count": 214, + "execution_count": 6, "id": "23cbabc3-0d4b-4e28-a4df-b2c5e8c7ea8b", "metadata": {}, "outputs": [], @@ -190,7 +212,7 @@ }, { "cell_type": "code", - "execution_count": 215, + "execution_count": 7, "id": "dc5573ef-734b-44d8-a4c4-0df19d655975", "metadata": {}, "outputs": [], @@ -200,8 +222,66 @@ }, { "cell_type": "code", - "execution_count": 228, - "id": "144e3716-b0e7-4a39-9a25-ededea506f4f", + "execution_count": null, + "id": "eddf6501-8428-44cc-8d2d-e245803a3943", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "outputs": [], + "source": [ + "def compute_ref_date_bounds(df):\n", + " \"\"\"\n", + " TODO: There are cases where the REF_DATE is a range, ex. 2023/2024.\n", + " For productId 17100022 the period is from July 1 to June 30 (seen in the metadata), so can't just \n", + " use January 1, 2023 and December 31, 2024\n", + " \"\"\"\n", + " series = df[\"REF_DATE\"]\n", + "\n", + " # Initialize the two new columns with NaT\n", + " df[\"REF_START_DATE\"] = pd.NaT\n", + " df[\"REF_END_DATE\"] = pd.NaT\n", + "\n", + " # Skip rows that contain slashes\n", + " valid_mask = ~series.str.contains(\"/\", na=False)\n", + "\n", + " # Case 1: YYYY-MM-DD\n", + " full_mask = valid_mask & series.str.fullmatch(r\"\\d{4}-\\d{2}-\\d{2}\")\n", + " parsed_full = pd.to_datetime(series[full_mask], format=\"%Y-%m-%d\", errors=\"coerce\")\n", + " df.loc[full_mask, \"REF_START_DATE\"] = parsed_full\n", + " df.loc[full_mask, \"REF_END_DATE\"] = parsed_full\n", + "\n", + " # Case 2: YYYY-MM\n", + " month_mask = valid_mask & series.str.fullmatch(r\"\\d{4}-\\d{2}\")\n", + " parsed_month = pd.to_datetime(series[month_mask], format=\"%Y-%m\", errors=\"coerce\")\n", + " df.loc[month_mask, \"REF_START_DATE\"] = parsed_month\n", + " df.loc[month_mask, \"REF_END_DATE\"] = parsed_month + pd.to_timedelta(\n", + " parsed_month.dt.days_in_month - 1, unit='D'\n", + " )\n", + "\n", + " # Case 3: YYYY\n", + " year_mask = valid_mask & series.str.fullmatch(r\"\\d{4}\")\n", + " parsed_year = pd.to_datetime(series[year_mask], format=\"%Y\", errors=\"coerce\")\n", + " df.loc[year_mask, \"REF_START_DATE\"] = parsed_year\n", + " df.loc[year_mask, \"REF_END_DATE\"] = parsed_year + pd.offsets.YearEnd(0)\n", + "\n", + " # Move columns after REF_DATE\n", + " ref_idx = df.columns.get_loc(\"REF_DATE\")\n", + " cols = list(df.columns)\n", + " cols.remove(\"REF_START_DATE\")\n", + " cols.remove(\"REF_END_DATE\")\n", + " cols[ref_idx + 1:ref_idx + 1] = [\"REF_START_DATE\", \"REF_END_DATE\"]\n", + "\n", + " return df[cols]" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "e67642d3-cc6c-4c5d-b5a3-2fe18364ad71", "metadata": {}, "outputs": [], "source": [ @@ -211,27 +291,24 @@ " For example, if the column is numerical and has a maximum value of 32,000 \n", " we can assign it a type of int16\n", " \"\"\"\n", - " print(\"Converting dataframe to optimal data types\")\n", - " params = {\n", - " 'convert_string': False,\n", - " 'convert_boolean': False\n", - " }\n", - " df = df.convert_dtypes(**params)\n", - "\n", " dtypes = pd.DataFrame(df.dtypes)\n", " # Downcast to the smallest numerical dtype\n", " for row in dtypes.itertuples():\n", " column = row[0]\n", " the_type = str(row[1])\n", - " # Skipping downcasting Float64 as there were issues with decimal places\n", - " # For example, instead of a value being 65.4, it turned into 65.4000015258789\n", - " if the_type == 'Float64':\n", - " continue\n", - " elif the_type == 'Int64':\n", + " if the_type == 'int64':\n", " df[column] = pd.to_numeric(df[column], downcast='integer')\n", "\n", - " return df\n", - "\n", + " return df" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "144e3716-b0e7-4a39-9a25-ededea506f4f", + "metadata": {}, + "outputs": [], + "source": [ "def extract_zipfile(product_id, language):\n", " \"\"\"\n", " It is faster to extract the zip file and read the CSV, than open\n", @@ -270,74 +347,180 @@ " if chunk: # filter out keep-alive new chunks\n", " handle.write(chunk)\n", " progress_bar.update(len(chunk))\n", - " progress_bar.close()\n", - " \n", - "def process_cube(product_id, language=\"en\"):\n", - " cur.execute(\"SELECT product_id FROM downloaded WHERE product_id = ?\", (product_id,))\n", - " result = cur.fetchone()\n", - " if result:\n", - " print(f\"Already processed {product_id}\")\n", - " return\n", - " extract_zipfile(product_id, language)\n", - " \"\"\"\n", - " The pandas column reader is better than the Polars one\n", - " Here is an example where polars was not reading it right:\n", - " https://www150.statcan.gc.ca/n1/tbl/csv/98100404-eng.zip\n", - " \"\"\"\n", - " # Get metadata\n", - " #metadata_file = f\"{input_folder}/metadata/{product_id}.json\"\n", - " #metadata = get_cube_metadata(product_id)\n", - " #print(f\"Writing metadata file {metadata_file}\")\n", - " #with open(metadata_file, \"w\") as outfile:\n", - " # json.dump(metadata, outfile)\n", - " # Read CSV using Pandas\n", - " product_csv = f\"{scratch_folder}/{product_id}.csv\"\n", - " parameters = {\n", - " \"engine\": \"c\",\n", - " \"low_memory\": True\n", - " }\n", - " print(f\"Reading {product_csv} as a Pandas dataframe\")\n", - " df = pd.read_csv(product_csv, **parameters)\n", - " df = convert_to_lowest_type(df)\n", - " # Fix mixed types. This happened in productId 11100147\n", - " for col in df.select_dtypes(include=[\"object\"]).columns:\n", - " types_in_col = df[col].map(type).unique()\n", - " # If there's any non-str type in the column\n", - " if any(t != str for t in types_in_col):\n", - " print(f\"Fixing mixed types in column: {col} — types found: {types_in_col}\")\n", - " # Replace non-str values with None\n", - " df[col] = df[col].where(df[col].map(type) == str, None).astype(\"string\")\n", - " print(\"Import Pandas dataframe as a Polars dataframe\")\n", - " df = pl.from_pandas(df)\n", - " output_parquet = f\"{output_folder}/{language}/{product_id}.parquet\"\n", - " print(f\"Exporting dataframe as parquet to {output_parquet}\")\n", - " df.write_parquet(output_parquet,\n", - " compression='zstd',\n", - " compression_level=22)\n", - " # Remove the scratch files\n", - " print(\"Removing scratch files\")\n", - " os.remove(f\"{scratch_folder}/{product_id}.csv\")\n", - " os.remove(f\"{scratch_folder}/{product_id}_MetaData.csv\")\n", - " update_last_downloaded(product_id)\n", - " update_last_processed(product_id)" + " progress_bar.close()" ] }, { "cell_type": "code", - "execution_count": 229, - "id": "6be0842e-c1e0-45b4-90e5-0be28b963aed", - "metadata": {}, + "execution_count": 12, + "id": "858e405e-7c02-4193-8abe-f23951761b09", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "outputs": [], + "source": [ + "product_id = \"43100011\"\n", + "#def process_cube(product_id, language=\"en\"):\n", + "language = \"en\"\n", + "cur.execute(\"SELECT product_id FROM downloaded WHERE product_id = ?\", (product_id,))\n", + "result = cur.fetchone()\n", + "if result:\n", + " print(f\"Already processed {product_id}\")\n", + " #return\n", + "#extract_zipfile(product_id, language)\n", + "\"\"\"\n", + "The pandas column reader is better than the Polars one\n", + "Here is an example where polars was not reading it right:\n", + "https://www150.statcan.gc.ca/n1/tbl/csv/98100404-eng.zip\n", + "\"\"\"\n", + "# Get metadata\n", + "#metadata_file = f\"{input_folder}/metadata/{product_id}.json\"\n", + "#metadata = get_cube_metadata(product_id)\n", + "#print(f\"Writing metadata file {metadata_file}\")\n", + "#with open(metadata_file, \"w\") as outfile:\n", + "# json.dump(metadata, outfile)\n", + "# Read CSV using Pandas\n", + "product_csv = f\"{scratch_folder}/{product_id}.csv\"\n", + "parameters = {\n", + " \"engine\": \"c\",\n", + " \"low_memory\": True,\n", + " #\"nrows\": 1000000,\n", + " \"dtype\": {}\n", + "}\n", + "columns = pd.read_csv(product_csv, nrows=0).columns\n", + "\n", + "columns_always_int_8 = [\"DECIMALS\", \"SCALAR_ID\"]\n", + "for column in columns_always_int_8:\n", + " if column in columns:\n", + " parameters[\"dtype\"][column] = 'int8'\n", + "\n", + "columns_always_int_16 = [\"UOM_ID\"]\n", + "for column in columns_always_int_16:\n", + " if column in columns:\n", + " parameters[\"dtype\"][column] = 'int16'\n", + "\n", + "for column in columns:\n", + " if column not in columns_always_int_8 and column not in columns_always_int_16 and column != \"VALUE\":\n", + " parameters[\"dtype\"][column] = 'string'\n", + "\n", + "if not parameters[\"dtype\"]:\n", + " del parameters[\"dtype\"]\n", + "\n", + "print(f\"Reading {product_csv} as a Pandas dataframe\")\n", + "df = pd.read_csv(product_csv, **parameters)\n", + "\n", + "unique_decimal_values = len(df[\"DECIMALS\"].unique())\n", + "if unique_decimal_values > 1:\n", + " \"\"\"\n", + " Example of when you can have a table with DECIMALS = 0 and DECIMALS = 1, productId 11100025\n", + " \"\"\"\n", + " # Turn to float if not already\n", + " convert_dict = {\"VALUE\": \"float64\"}\n", + " df = df.astype(convert_dict)\n", + "elif unique_decimal_values == 1:\n", + " if df[\"VALUE\"].dtype != \"int64\":\n", + " # Turn to int64 if not already\n", + " convert_dict = {\"VALUE\": \"int64\"}\n", + " df = df.astype(convert_dict)\n", + "\n", + "df = convert_to_lowest_type(df)\n", + "df = compute_ref_date_bounds(df)" + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "id": "49cc1fa3-1ac8-4510-b4fd-7827a041e4a9", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "Already processed 11100147\n" + "Exporting dataframe as parquet to /data/tables/output/en/43100011.parquet\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "IOStream.flush timed out\n" ] } ], "source": [ - "process_cube(\"11100147\")" + "output_parquet = f\"{output_folder}/{language}/{product_id}.parquet\"\n", + "print(f\"Exporting dataframe as parquet to {output_parquet}\")\n", + "parameters = {\n", + " \"path\": output_parquet,\n", + " \"engine\": \"pyarrow\",\n", + " \"compression\": \"zstd\",\n", + " \"index\": False,\n", + " \"compression_level\": 22\n", + "}\n", + "df.to_parquet(**parameters)" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "id": "788bc668-8057-4e06-91a3-b99991e0a410", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Removing scratch files\n", + "Reading metadata /data/tables/input/metadata/43100011.json\n" + ] + }, + { + "ename": "OperationalError", + "evalue": "no such column: last_processed", + "output_type": "error", + "traceback": [ + "\u001b[31m---------------------------------------------------------------------------\u001b[39m", + "\u001b[31mOperationalError\u001b[39m Traceback (most recent call last)", + "\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[26]\u001b[39m\u001b[32m, line 6\u001b[39m\n\u001b[32m 4\u001b[39m os.remove(\u001b[33mf\u001b[39m\u001b[33m\"\u001b[39m\u001b[38;5;132;01m{\u001b[39;00mscratch_folder\u001b[38;5;132;01m}\u001b[39;00m\u001b[33m/\u001b[39m\u001b[38;5;132;01m{\u001b[39;00mproduct_id\u001b[38;5;132;01m}\u001b[39;00m\u001b[33m_MetaData.csv\u001b[39m\u001b[33m\"\u001b[39m)\n\u001b[32m 5\u001b[39m update_last_downloaded(product_id)\n\u001b[32m----> \u001b[39m\u001b[32m6\u001b[39m \u001b[43mupdate_last_processed\u001b[49m\u001b[43m(\u001b[49m\u001b[43mproduct_id\u001b[49m\u001b[43m)\u001b[49m\n", + "\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[5]\u001b[39m\u001b[32m, line 3\u001b[39m, in \u001b[36mupdate_last_processed\u001b[39m\u001b[34m(product_id)\u001b[39m\n\u001b[32m 1\u001b[39m \u001b[38;5;28;01mdef\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34mupdate_last_processed\u001b[39m(product_id):\n\u001b[32m 2\u001b[39m time_finished_processing = datetime.now().isoformat()\n\u001b[32m----> \u001b[39m\u001b[32m3\u001b[39m \u001b[43mcur\u001b[49m\u001b[43m.\u001b[49m\u001b[43mexecute\u001b[49m\u001b[43m(\u001b[49m\u001b[33;43m\"\u001b[39;49m\u001b[33;43mUPDATE downloaded SET last_processed = ? WHERE product_id = ?\u001b[39;49m\u001b[33;43m\"\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43m(\u001b[49m\u001b[43mtime_finished_processing\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mproduct_id\u001b[49m\u001b[43m)\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 4\u001b[39m con.commit()\n", + "\u001b[31mOperationalError\u001b[39m: no such column: last_processed" + ] + } + ], + "source": [ + "# Remove the scratch files\n", + "print(\"Removing scratch files\")\n", + "os.remove(f\"{scratch_folder}/{product_id}.csv\")\n", + "os.remove(f\"{scratch_folder}/{product_id}_MetaData.csv\")\n", + "update_last_downloaded(product_id)\n", + "update_last_processed(product_id)" + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "id": "06fb89ad-77ba-46db-bb88-15f5636d707d", + "metadata": {}, + "outputs": [], + "source": [ + "#process_cube(\"43100011\")" ] } ],