From c0899080f4123de1a39500d4d2009a72ef37b671 Mon Sep 17 00:00:00 2001 From: Diego Ripley Date: Wed, 18 Jun 2025 09:26:18 -0400 Subject: [PATCH] Remove scratch files after processing. Was running out of space --- .../process_files_multiprocessing.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/experiments/statcan_products/process_files_multiprocessing.py b/experiments/statcan_products/process_files_multiprocessing.py index 185e82e..580cfbc 100644 --- a/experiments/statcan_products/process_files_multiprocessing.py +++ b/experiments/statcan_products/process_files_multiprocessing.py @@ -1,5 +1,6 @@ import glob from multiprocessing import Pool +import os import zipfile import pandas as pd @@ -24,16 +25,14 @@ def convert_to_lowest_type(df): df = df.convert_dtypes(**params) dtypes = pd.DataFrame(df.dtypes) - # Downcast to the smallest numerical dtype for row in dtypes.itertuples(): column = row[0] the_type = str(row[1]) - # Skipping downcasting Float64 as there were issues with decimal places # For example, instead of a value being 65.4, it turned into 65.4000015258789 if the_type == 'Float64': - continue + continue elif the_type == 'Int64': df[column] = pd.to_numeric(df[column], downcast='integer') @@ -54,13 +53,13 @@ def process_table(product_id, language="en"): extract_zipfile(product_id, language) """ The pandas column reader is better than the Polars one - Here is an example where polars was not reading it right: https://www150.statcan.gc.ca/n1/tbl/csv/98100404-eng.zip """ product_csv = f"{scratch_folder}/{product_id}.csv" parameters = { - "engine": "pyarrow" + "engine": "c", + "low_memory": True } print(f"Reading {product_csv} as a Pandas dataframe") df = pd.read_csv(product_csv, **parameters) @@ -72,10 +71,16 @@ def process_table(product_id, language="en"): df.write_parquet(f"{output_folder}/{language}/{product_id}.parquet", compression='zstd', compression_level=22) + # Remove the scratch files + print("Removing scratch files") + os.remove(f"{scratch_folder}/{product_id}.csv") + os.remove(f"{scratch_folder}/{product_id}_MetaData.csv") if __name__ == '__main__': files_to_process = glob.glob(f"{input_folder}/en/*.zip") # Get the product_id files_to_process = [x.split("/")[-1].split(".zip")[0] for x in files_to_process] + to_process = len(files_to_process) + print(f"Processing {to_process}") with Pool() as p: - p.map(process_table, files_to_process) \ No newline at end of file + p.map(process_table, files_to_process)