diff --git a/experiments/statcan_products/process_files_multiprocessing.py b/experiments/statcan_products/process_files_multiprocessing.py index bd08483..73c7cad 100644 --- a/experiments/statcan_products/process_files_multiprocessing.py +++ b/experiments/statcan_products/process_files_multiprocessing.py @@ -8,7 +8,6 @@ import zipfile from zoneinfo import ZoneInfo import pandas as pd -import polars as pl import requests from tqdm import tqdm @@ -17,7 +16,6 @@ input_folder = f"{data_folder}/input" scratch_folder = f"{data_folder}/scratch" output_folder = f"{data_folder}/output" - if not os.path.exists(f"{data_folder}/processing.db"): con = sqlite3.connect(f"{data_folder}/processing.db") cur = con.cursor() @@ -42,7 +40,7 @@ def setup(): """ Makes data folders """ - folders_to_create = [data_folder, input_folder, + folders_to_create = [data_folder, input_folder, scratch_folder, output_folder, f"{input_folder}/en", f"{output_folder}/en", f"{input_folder}/fr", f"{output_folder}/fr", @@ -125,29 +123,62 @@ def update_tables(): download_cube(product_id) process_cube(product_id) +def compute_ref_date_bounds(df): + """ + TODO: There are cases where the REF_DATE is a range, ex. 2023/2024. + For productId 17100022 the period is from July 1 to June 30 (seen in the metadata), so can't just + use January 1, 2023 and December 31, 2024 + """ + series = df["REF_DATE"] + + # Initialize the two new columns with NaT + df["REF_START_DATE"] = pd.NaT + df["REF_END_DATE"] = pd.NaT + + # Skip rows that contain slashes + valid_mask = ~series.str.contains("/", na=False) + + # Case 1: YYYY-MM-DD + full_mask = valid_mask & series.str.fullmatch(r"\d{4}-\d{2}-\d{2}") + parsed_full = pd.to_datetime(series[full_mask], format="%Y-%m-%d", errors="coerce") + df.loc[full_mask, "REF_START_DATE"] = parsed_full + df.loc[full_mask, "REF_END_DATE"] = parsed_full + + # Case 2: YYYY-MM + month_mask = valid_mask & series.str.fullmatch(r"\d{4}-\d{2}") + parsed_month = pd.to_datetime(series[month_mask], format="%Y-%m", errors="coerce") + df.loc[month_mask, "REF_START_DATE"] = parsed_month + df.loc[month_mask, "REF_END_DATE"] = parsed_month + pd.to_timedelta( + parsed_month.dt.days_in_month - 1, unit='D' + ) + + # Case 3: YYYY + year_mask = valid_mask & series.str.fullmatch(r"\d{4}") + parsed_year = pd.to_datetime(series[year_mask], format="%Y", errors="coerce") + df.loc[year_mask, "REF_START_DATE"] = parsed_year + df.loc[year_mask, "REF_END_DATE"] = parsed_year + pd.offsets.YearEnd(0) + + # Move columns after REF_DATE + ref_idx = df.columns.get_loc("REF_DATE") + cols = list(df.columns) + cols.remove("REF_START_DATE") + cols.remove("REF_END_DATE") + cols[ref_idx + 1:ref_idx + 1] = ["REF_START_DATE", "REF_END_DATE"] + + return df[cols] + def convert_to_lowest_type(df): """ Convert columns to the best possible dtypes - For example, if the column is numerical and has a maximum value of 32,000 + For example, if the column is numerical and has a maximum value of 32,000 we can assign it a type of int16 """ - print("Converting dataframe to optimal data types") - params = { - 'convert_string': False, - 'convert_boolean': False - } - df = df.convert_dtypes(**params) - dtypes = pd.DataFrame(df.dtypes) # Downcast to the smallest numerical dtype for row in dtypes.itertuples(): column = row[0] the_type = str(row[1]) - # Skipping downcasting Float64 as there were issues with decimal places - # For example, instead of a value being 65.4, it turned into 65.4000015258789 - if the_type == 'Float64': - continue - elif the_type == 'Int64': + if the_type == 'int64': df[column] = pd.to_numeric(df[column], downcast='integer') return df @@ -193,6 +224,17 @@ def download_cube(product_id, language="en"): progress_bar.close() def process_cube(product_id, language="en"): + """ + Examples: + - productId 43100011 has all with DECIMAL = 1 (float64) + - productId 17100009 has DECIMAL = 0 (int64) + - productId 35100076 has multiple DECIMAL precisions [0, 1, 2] (int64, float64, float64) + """ + cur.execute("SELECT product_id FROM downloaded WHERE product_id = ?", (product_id,)) + result = cur.fetchone() + if result: + print(f"Already processed {product_id}") + return extract_zipfile(product_id, language) """ The pandas column reader is better than the Polars one @@ -207,20 +249,63 @@ def process_cube(product_id, language="en"): # json.dump(metadata, outfile) # Read CSV using Pandas product_csv = f"{scratch_folder}/{product_id}.csv" + print(f"Reading {product_csv}") parameters = { "engine": "c", - "low_memory": True + "low_memory": True, + "nrows": 100000, + "dtype": {} } + columns = pd.read_csv(product_csv, nrows=0).columns + + columns_always_int_8 = ["DECIMALS", "SCALAR_ID"] + for column in columns_always_int_8: + if column in columns: + parameters["dtype"][column] = 'int8' + + columns_always_int_16 = ["UOM_ID"] + for column in columns_always_int_16: + if column in columns: + parameters["dtype"][column] = 'int16' + + for column in columns: + if column not in columns_always_int_8 and column not in columns_always_int_16 and column != "VALUE": + parameters["dtype"][column] = 'string' + + if not parameters["dtype"]: + del parameters["dtype"] + print(f"Reading {product_csv} as a Pandas dataframe") df = pd.read_csv(product_csv, **parameters) + unique_decimal_values = df["DECIMALS"].unique() + if any(unique_decimal_values): + """ + A table can have both float and integer in the VALUE field. + productId 11100025 is an example + So if we have unique values for DECIMALS to be [0,1], then we convert to float64 + """ + convert_dict = {"VALUE": "float64"} + print(convert_dict) + df = df.astype(convert_dict) + elif 0 in (unique_decimal_values): + if df["VALUE"].dtype != "Int64": + # If DECIMALS = [0] + convert_dict = {"VALUE": "Int64"} + print(convert_dict) + df = df.astype(convert_dict) + df = convert_to_lowest_type(df) - print("Import Pandas dataframe as a Polars dataframe") - df = pl.from_pandas(df) + df = compute_ref_date_bounds(df) output_parquet = f"{output_folder}/{language}/{product_id}.parquet" print(f"Exporting dataframe as parquet to {output_parquet}") - df.write_parquet(output_parquet, - compression='zstd', - compression_level=22) + parameters = { + "path": output_parquet, + "engine": "pyarrow", + "compression": "zstd", + "index": False, + "compression_level": 22 + } + df.to_parquet(**parameters) # Remove the scratch files print("Removing scratch files") os.remove(f"{scratch_folder}/{product_id}.csv") @@ -228,6 +313,7 @@ def process_cube(product_id, language="en"): update_last_downloaded(product_id) update_last_processed(product_id) + if __name__ == '__main__': setup() files_to_process = glob.glob(f"{input_folder}/en/*.zip") @@ -235,5 +321,7 @@ if __name__ == '__main__': files_to_process = [x.split("/")[-1].split(".zip")[0] for x in files_to_process] to_process = len(files_to_process) print(f"Processing {to_process}") - with Pool(4) as p: - p.map(process_cube, files_to_process) + #for product_id in files_to_process: + # process_cube(product_id) + with Pool(processes=16) as p: + p.map(process_cube, files_to_process, chunksize=8) diff --git a/experiments/statcan_products/process_product_test.ipynb b/experiments/statcan_products/process_product_test.ipynb index 1f5bdcf..ea26878 100644 --- a/experiments/statcan_products/process_product_test.ipynb +++ b/experiments/statcan_products/process_product_test.ipynb @@ -25,7 +25,7 @@ }, { "cell_type": "code", - "execution_count": 1, + "execution_count": 20, "id": "98859cd6-6fa4-4aef-a113-455699524fae", "metadata": { "editable": true, @@ -77,7 +77,7 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": 21, "id": "28ac4c01-c1c5-427f-bb2c-0da99a4c5591", "metadata": {}, "outputs": [], @@ -99,7 +99,7 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": 22, "id": "9daa94f4-16c9-4d8b-951e-3d5d38eb618f", "metadata": {}, "outputs": [], @@ -109,7 +109,7 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": 23, "id": "0af9a4b3-7b59-460b-b933-504919d4bd2a", "metadata": { "editable": true, @@ -136,12 +136,12 @@ " last_updated = datetime.strptime(last_updated, \"%Y-%m-%dT%H:%M\")\n", " last_updated = last_updated.replace(tzinfo=ZoneInfo(\"America/Toronto\"))\n", " last_updated = last_updated.astimezone(ZoneInfo(\"UTC\")).isoformat()\n", - " \n", + "\n", " data = (product_id, last_updated)\n", " cur.execute(\"SELECT product_id FROM downloaded WHERE product_id = ?\", (product_id,))\n", " result = cur.fetchone()\n", " if not result:\n", - " cur.execute(\"INSERT INTO downloaded VALUES (?, ?)\", data)\n", + " cur.execute(\"INSERT INTO downloaded (product_id, last_updated) VALUES (?, ?)\", data)\n", " else:\n", " cur.execute(\"UPDATE downloaded SET last_updated = ? WHERE product_id = ?\", (last_updated, product_id))\n", "\n", @@ -150,7 +150,7 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 24, "id": "4b7996d2-75ab-4173-a17a-64fb7ab63740", "metadata": {}, "outputs": [], @@ -163,7 +163,7 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 25, "id": "23cbabc3-0d4b-4e28-a4df-b2c5e8c7ea8b", "metadata": {}, "outputs": [], @@ -212,7 +212,7 @@ }, { "cell_type": "code", - "execution_count": 7, + "execution_count": 26, "id": "dc5573ef-734b-44d8-a4c4-0df19d655975", "metadata": {}, "outputs": [], @@ -222,7 +222,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 27, "id": "eddf6501-8428-44cc-8d2d-e245803a3943", "metadata": { "editable": true, @@ -280,7 +280,7 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 28, "id": "e67642d3-cc6c-4c5d-b5a3-2fe18364ad71", "metadata": {}, "outputs": [], @@ -304,7 +304,7 @@ }, { "cell_type": "code", - "execution_count": 11, + "execution_count": 29, "id": "144e3716-b0e7-4a39-9a25-ededea506f4f", "metadata": {}, "outputs": [], @@ -352,7 +352,7 @@ }, { "cell_type": "code", - "execution_count": 12, + "execution_count": 78, "id": "858e405e-7c02-4193-8abe-f23951761b09", "metadata": { "editable": true, @@ -361,9 +361,32 @@ }, "tags": [] }, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Reading /data/tables/scratch/13100102.csv\n", + "Index(['REF_DATE', 'GEO', 'DGUID',\n", + " 'North American Industry Classification System (NAICS)',\n", + " 'Summary statistics', 'UOM', 'UOM_ID', 'SCALAR_FACTOR', 'SCALAR_ID',\n", + " 'VECTOR', 'COORDINATE', 'VALUE', 'STATUS', 'SYMBOL', 'TERMINATED',\n", + " 'DECIMALS'],\n", + " dtype='object')\n", + "{'engine': 'c', 'low_memory': True, 'nrows': 1000000, 'dtype': {'DECIMALS': 'int8', 'SCALAR_ID': 'int8', 'UOM_ID': 'int16', 'REF_DATE': 'string', 'GEO': 'string', 'DGUID': 'string', 'North American Industry Classification System (NAICS)': 'string', 'Summary statistics': 'string', 'UOM': 'string', 'SCALAR_FACTOR': 'string', 'VECTOR': 'string', 'COORDINATE': 'string', 'STATUS': 'string', 'SYMBOL': 'string', 'TERMINATED': 'string'}}\n", + "Reading /data/tables/scratch/13100102.csv as a Pandas dataframe\n" + ] + } + ], "source": [ - "product_id = \"43100011\"\n", + "\"\"\"\n", + "Examples: \n", + "- productId 43100011 has all with DECIMAL = 1 (float64)\n", + "- productId 17100009 has DECIMAL = 0 (int64)\n", + "- productId 35100076 has multiple DECIMAL precisions [0, 1, 2] (int64, float64, float64)\n", + "\"\"\"\n", + "\n", + "product_id = \"13100102\"\n", "#def process_cube(product_id, language=\"en\"):\n", "language = \"en\"\n", "cur.execute(\"SELECT product_id FROM downloaded WHERE product_id = ?\", (product_id,))\n", @@ -385,13 +408,16 @@ "# json.dump(metadata, outfile)\n", "# Read CSV using Pandas\n", "product_csv = f\"{scratch_folder}/{product_id}.csv\"\n", + "print(f\"Reading {product_csv}\")\n", "parameters = {\n", " \"engine\": \"c\",\n", " \"low_memory\": True,\n", - " #\"nrows\": 1000000,\n", + " \"nrows\": 1000000,\n", " \"dtype\": {}\n", "}\n", + "\n", "columns = pd.read_csv(product_csv, nrows=0).columns\n", + "print(columns)\n", "\n", "columns_always_int_8 = [\"DECIMALS\", \"SCALAR_ID\"]\n", "for column in columns_always_int_8:\n", @@ -407,24 +433,46 @@ " if column not in columns_always_int_8 and column not in columns_always_int_16 and column != \"VALUE\":\n", " parameters[\"dtype\"][column] = 'string'\n", "\n", + "print(parameters)\n", "if not parameters[\"dtype\"]:\n", " del parameters[\"dtype\"]\n", "\n", "print(f\"Reading {product_csv} as a Pandas dataframe\")\n", - "df = pd.read_csv(product_csv, **parameters)\n", - "\n", - "unique_decimal_values = len(df[\"DECIMALS\"].unique())\n", - "if unique_decimal_values > 1:\n", + "df = pd.read_csv(product_csv, **parameters)" + ] + }, + { + "cell_type": "code", + "execution_count": 79, + "id": "7579a135-1dfe-4fc0-991b-4b261d6577e0", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[1]\n", + "{'VALUE': 'float64'}\n" + ] + } + ], + "source": [ + "unique_decimal_values = df[\"DECIMALS\"].unique()\n", + "print(unique_decimal_values)\n", + "if any(unique_decimal_values):\n", " \"\"\"\n", - " Example of when you can have a table with DECIMALS = 0 and DECIMALS = 1, productId 11100025\n", + " A table can have both float and integer in the VALUE field. \n", + " productId 11100025 is an example\n", + " So if we have unique values for DECIMALS to be [0,1], then we convert to float64\n", " \"\"\"\n", - " # Turn to float if not already\n", " convert_dict = {\"VALUE\": \"float64\"}\n", + " print(convert_dict)\n", " df = df.astype(convert_dict)\n", - "elif unique_decimal_values == 1:\n", + "elif 0 in (unique_decimal_values):\n", " if df[\"VALUE\"].dtype != \"int64\":\n", - " # Turn to int64 if not already\n", + " # If DECIMALS = [0]\n", " convert_dict = {\"VALUE\": \"int64\"}\n", + " print(convert_dict)\n", " df = df.astype(convert_dict)\n", "\n", "df = convert_to_lowest_type(df)\n", @@ -433,7 +481,203 @@ }, { "cell_type": "code", - "execution_count": 22, + "execution_count": 80, + "id": "6c2781b3-8eea-4317-a8c0-083d97ee04fc", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
REF_DATEREF_START_DATEREF_END_DATEGEODGUIDNorth American Industry Classification System (NAICS)Summary statisticsUOMUOM_IDSCALAR_FACTORSCALAR_IDVECTORCOORDINATEVALUESTATUSSYMBOLTERMINATEDDECIMALS
020142014-01-012014-12-31Canada2016A000011124Nursing and residential care facilities [623]Operating revenueDollars81millions6v1148091891.1.19310.7<NA><NA><NA>1
120142014-01-012014-12-31Canada2016A000011124Nursing and residential care facilities [623]Operating expensesDollars81millions6v1148091901.1.28499.5<NA><NA><NA>1
220142014-01-012014-12-31Canada2016A000011124Nursing and residential care facilities [623]Salaries, wages, commissions and benefitsDollars81millions6v1148091911.1.34630.3<NA><NA><NA>1
320142014-01-012014-12-31Canada2016A000011124Nursing and residential care facilities [623]Operating profit marginPercent239units0v1148091921.1.48.7<NA><NA><NA>1
420142014-01-012014-12-31Newfoundland and Labrador2016A000210Nursing and residential care facilities [623]Operating revenueDollars81millions6v1148091932.1.197.9<NA><NA><NA>1
\n", + "
" + ], + "text/plain": [ + " REF_DATE REF_START_DATE REF_END_DATE GEO \\\n", + "0 2014 2014-01-01 2014-12-31 Canada \n", + "1 2014 2014-01-01 2014-12-31 Canada \n", + "2 2014 2014-01-01 2014-12-31 Canada \n", + "3 2014 2014-01-01 2014-12-31 Canada \n", + "4 2014 2014-01-01 2014-12-31 Newfoundland and Labrador \n", + "\n", + " DGUID North American Industry Classification System (NAICS) \\\n", + "0 2016A000011124 Nursing and residential care facilities [623] \n", + "1 2016A000011124 Nursing and residential care facilities [623] \n", + "2 2016A000011124 Nursing and residential care facilities [623] \n", + "3 2016A000011124 Nursing and residential care facilities [623] \n", + "4 2016A000210 Nursing and residential care facilities [623] \n", + "\n", + " Summary statistics UOM UOM_ID SCALAR_FACTOR \\\n", + "0 Operating revenue Dollars 81 millions \n", + "1 Operating expenses Dollars 81 millions \n", + "2 Salaries, wages, commissions and benefits Dollars 81 millions \n", + "3 Operating profit margin Percent 239 units \n", + "4 Operating revenue Dollars 81 millions \n", + "\n", + " SCALAR_ID VECTOR COORDINATE VALUE STATUS SYMBOL TERMINATED DECIMALS \n", + "0 6 v114809189 1.1.1 9310.7 1 \n", + "1 6 v114809190 1.1.2 8499.5 1 \n", + "2 6 v114809191 1.1.3 4630.3 1 \n", + "3 0 v114809192 1.1.4 8.7 1 \n", + "4 6 v114809193 2.1.1 97.9 1 " + ] + }, + "execution_count": 80, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df.head()" + ] + }, + { + "cell_type": "code", + "execution_count": 81, "id": "49cc1fa3-1ac8-4510-b4fd-7827a041e4a9", "metadata": { "editable": true, @@ -447,14 +691,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "Exporting dataframe as parquet to /data/tables/output/en/43100011.parquet\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "IOStream.flush timed out\n" + "Exporting dataframe as parquet to /data/tables/output/en/13100102.parquet\n" ] } ], @@ -515,12 +752,35 @@ }, { "cell_type": "code", - "execution_count": 24, + "execution_count": 40, "id": "06fb89ad-77ba-46db-bb88-15f5636d707d", "metadata": {}, + "outputs": [ + { + "ename": "NameError", + "evalue": "name 'process_cube' is not defined", + "output_type": "error", + "traceback": [ + "\u001b[31m---------------------------------------------------------------------------\u001b[39m", + "\u001b[31mNameError\u001b[39m Traceback (most recent call last)", + "\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[40]\u001b[39m\u001b[32m, line 1\u001b[39m\n\u001b[32m----> \u001b[39m\u001b[32m1\u001b[39m \u001b[43mprocess_cube\u001b[49m(\u001b[33m\"\u001b[39m\u001b[33m43100011\u001b[39m\u001b[33m\"\u001b[39m)\n", + "\u001b[31mNameError\u001b[39m: name 'process_cube' is not defined" + ] + } + ], + "source": [ + "process_cube(\"37100216\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9cc04f15-006f-4a3a-9610-65736820ba84", + "metadata": {}, "outputs": [], "source": [ - "#process_cube(\"43100011\")" + "# This one has multiple DECIMAL precision values\n", + "process_cube(\"43100011)" ] } ],