mirror of
https://github.com/dataforcanada/d4c-datapkg-statistical.git
synced 2026-06-13 14:10:55 +02:00
Remove scratch files after processing. Was running out of space
This commit is contained in:
@@ -1,5 +1,6 @@
|
|||||||
import glob
|
import glob
|
||||||
from multiprocessing import Pool
|
from multiprocessing import Pool
|
||||||
|
import os
|
||||||
import zipfile
|
import zipfile
|
||||||
|
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
@@ -24,12 +25,10 @@ def convert_to_lowest_type(df):
|
|||||||
df = df.convert_dtypes(**params)
|
df = df.convert_dtypes(**params)
|
||||||
|
|
||||||
dtypes = pd.DataFrame(df.dtypes)
|
dtypes = pd.DataFrame(df.dtypes)
|
||||||
|
|
||||||
# Downcast to the smallest numerical dtype
|
# Downcast to the smallest numerical dtype
|
||||||
for row in dtypes.itertuples():
|
for row in dtypes.itertuples():
|
||||||
column = row[0]
|
column = row[0]
|
||||||
the_type = str(row[1])
|
the_type = str(row[1])
|
||||||
|
|
||||||
# Skipping downcasting Float64 as there were issues with decimal places
|
# Skipping downcasting Float64 as there were issues with decimal places
|
||||||
# For example, instead of a value being 65.4, it turned into 65.4000015258789
|
# For example, instead of a value being 65.4, it turned into 65.4000015258789
|
||||||
if the_type == 'Float64':
|
if the_type == 'Float64':
|
||||||
@@ -54,13 +53,13 @@ def process_table(product_id, language="en"):
|
|||||||
extract_zipfile(product_id, language)
|
extract_zipfile(product_id, language)
|
||||||
"""
|
"""
|
||||||
The pandas column reader is better than the Polars one
|
The pandas column reader is better than the Polars one
|
||||||
|
|
||||||
Here is an example where polars was not reading it right:
|
Here is an example where polars was not reading it right:
|
||||||
https://www150.statcan.gc.ca/n1/tbl/csv/98100404-eng.zip
|
https://www150.statcan.gc.ca/n1/tbl/csv/98100404-eng.zip
|
||||||
"""
|
"""
|
||||||
product_csv = f"{scratch_folder}/{product_id}.csv"
|
product_csv = f"{scratch_folder}/{product_id}.csv"
|
||||||
parameters = {
|
parameters = {
|
||||||
"engine": "pyarrow"
|
"engine": "c",
|
||||||
|
"low_memory": True
|
||||||
}
|
}
|
||||||
print(f"Reading {product_csv} as a Pandas dataframe")
|
print(f"Reading {product_csv} as a Pandas dataframe")
|
||||||
df = pd.read_csv(product_csv, **parameters)
|
df = pd.read_csv(product_csv, **parameters)
|
||||||
@@ -72,10 +71,16 @@ def process_table(product_id, language="en"):
|
|||||||
df.write_parquet(f"{output_folder}/{language}/{product_id}.parquet",
|
df.write_parquet(f"{output_folder}/{language}/{product_id}.parquet",
|
||||||
compression='zstd',
|
compression='zstd',
|
||||||
compression_level=22)
|
compression_level=22)
|
||||||
|
# Remove the scratch files
|
||||||
|
print("Removing scratch files")
|
||||||
|
os.remove(f"{scratch_folder}/{product_id}.csv")
|
||||||
|
os.remove(f"{scratch_folder}/{product_id}_MetaData.csv")
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
files_to_process = glob.glob(f"{input_folder}/en/*.zip")
|
files_to_process = glob.glob(f"{input_folder}/en/*.zip")
|
||||||
# Get the product_id
|
# Get the product_id
|
||||||
files_to_process = [x.split("/")[-1].split(".zip")[0] for x in files_to_process]
|
files_to_process = [x.split("/")[-1].split(".zip")[0] for x in files_to_process]
|
||||||
|
to_process = len(files_to_process)
|
||||||
|
print(f"Processing {to_process}")
|
||||||
with Pool() as p:
|
with Pool() as p:
|
||||||
p.map(process_table, files_to_process)
|
p.map(process_table, files_to_process)
|
||||||
Reference in New Issue
Block a user