mirror of
https://github.com/dataforcanada/d4c-api.git
synced 2026-06-13 14:30:57 +02:00
This does not belong here. Will be moved to https://github.com/dataforcanada/d4c-datapkg-statistical
This commit is contained in:
-403
@@ -1,403 +0,0 @@
|
||||
"""
|
||||
TODO:
|
||||
- Modularize code
|
||||
- Improve documentation
|
||||
- Create test cases
|
||||
"""
|
||||
from datetime import datetime
|
||||
import glob
|
||||
from multiprocessing import Pool
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sqlite3
|
||||
import zipfile
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
import pandas as pd
|
||||
"""
|
||||
TODO: Look into whether polars CSV parsing has improved since 2025-06-19.
|
||||
I encountered a bunch of issues with properly parsing StatCan CSVs
|
||||
"""
|
||||
import polars as pl
|
||||
import requests
|
||||
from tqdm import tqdm
|
||||
|
||||
data_folder = "/data/tables"
|
||||
input_folder = f"{data_folder}/input"
|
||||
scratch_folder = f"{data_folder}/scratch"
|
||||
output_folder = f"{data_folder}/output"
|
||||
|
||||
"""
|
||||
TODO: Do logging properly
|
||||
"""
|
||||
import logging
|
||||
logging.basicConfig(
|
||||
filename="processing.log",
|
||||
encoding="utf-8",
|
||||
filemode="a",
|
||||
format="{asctime} - {levelname} - {message}",
|
||||
style="{",
|
||||
datefmt="%Y-%m-%d %H:%M",
|
||||
)
|
||||
|
||||
"""
|
||||
TODO: Do this properly. These are the requirements:
|
||||
- Lightweight way for users to keep track what StatCan tables they have
|
||||
downloaded, and for them to update the table when new data is available
|
||||
"""
|
||||
if not os.path.exists(f"{data_folder}/processing.db"):
|
||||
con = sqlite3.connect(f"{data_folder}/processing.db", timeout=60.0)
|
||||
cur = con.cursor()
|
||||
cur.executescript("""
|
||||
CREATE TABLE IF NOT EXISTS downloaded (
|
||||
product_id TEXT PRIMARY KEY,
|
||||
last_updated TEXT,
|
||||
last_processed TEXT
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS cubes (
|
||||
product_id TEXT PRIMARY KEY,
|
||||
last_updated TEXT
|
||||
);
|
||||
""")
|
||||
con.commit()
|
||||
else:
|
||||
con = sqlite3.connect(f"{data_folder}/processing.db", timeout=60.0)
|
||||
cur = con.cursor()
|
||||
|
||||
def setup():
|
||||
"""
|
||||
Makes data folders
|
||||
"""
|
||||
folders_to_create = [data_folder, input_folder,
|
||||
scratch_folder, output_folder,
|
||||
f"{input_folder}/en", f"{output_folder}/en",
|
||||
f"{input_folder}/fr", f"{output_folder}/fr",
|
||||
f"{input_folder}/metadata"]
|
||||
for folder in folders_to_create:
|
||||
if not os.path.exists(folder):
|
||||
print(f"Making folder {folder}")
|
||||
os.mkdir(folder)
|
||||
|
||||
def update_last_downloaded(product_id):
|
||||
"""
|
||||
Updates SQLite database with the last time the table was updated
|
||||
The datetime is in Eastern timezone, so have to convert to UTC to
|
||||
be consistent with https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite
|
||||
"""
|
||||
filepath = f"{input_folder}/metadata/{product_id}.json"
|
||||
print(f"Reading metadata {filepath}")
|
||||
with open(filepath, 'r') as fp:
|
||||
metadata = json.load(fp)
|
||||
product_id = metadata.get("object").get("productId")
|
||||
last_updated = metadata.get("object").get("releaseTime")
|
||||
# Convert last_updated to UTC since /getAllcubesListLite uses UTC
|
||||
last_updated = datetime.strptime(last_updated, "%Y-%m-%dT%H:%M")
|
||||
last_updated = last_updated.replace(tzinfo=ZoneInfo("America/Toronto"))
|
||||
last_updated = last_updated.astimezone(ZoneInfo("UTC")).isoformat()
|
||||
|
||||
data = (product_id, last_updated)
|
||||
cur.execute("SELECT product_id FROM downloaded WHERE product_id = ?", (product_id,))
|
||||
result = cur.fetchone()
|
||||
if not result:
|
||||
cur.execute("INSERT INTO downloaded (product_id, last_updated) VALUES (?, ?)", data)
|
||||
else:
|
||||
cur.execute("UPDATE downloaded SET last_updated = ? WHERE product_id = ?", (last_updated, product_id))
|
||||
|
||||
con.commit()
|
||||
|
||||
def update_last_processed(product_id):
|
||||
time_finished_processing = datetime.now().isoformat()
|
||||
cur.execute("UPDATE downloaded SET last_processed = ? WHERE product_id = ?", (time_finished_processing, product_id))
|
||||
con.commit()
|
||||
|
||||
def update_tables():
|
||||
"""
|
||||
This currently does not work as expected because Statistics Canada has discrepancies.
|
||||
The "releaseTime" listed in https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite
|
||||
for every pdocutId is not the same as "releaseTime" listed when making a POST
|
||||
https://www150.statcan.gc.ca/t1/wds/rest/getCubeMetadata , for example:
|
||||
[{"productId":10100007}]
|
||||
"""
|
||||
cur.execute("""
|
||||
DELETE FROM cubes;
|
||||
""")
|
||||
con.commit()
|
||||
response = requests.get("https://www150.statcan.gc.ca/t1/wds/rest/getAllCubesListLite").json()
|
||||
cubes_metadata = pl.from_dicts(response)[['productId', 'releaseTime']]
|
||||
cubes_metadata = cubes_metadata.rename({"productId": "product_id", "releaseTime": "last_updated"})
|
||||
cubes_metadata = cubes_metadata.rows()
|
||||
cubes_metadata_new = []
|
||||
for cube in cubes_metadata:
|
||||
product_id, last_updated = cube
|
||||
# Update the date field so it is formatted the same as date field in downloaded table
|
||||
last_updated = datetime.strptime(last_updated, "%Y-%m-%dT%H:%M:%SZ").astimezone(ZoneInfo("UTC"))
|
||||
last_updated = last_updated.isoformat()
|
||||
cubes_metadata_new.append((product_id, last_updated))
|
||||
|
||||
cur.executemany("INSERT INTO cubes VALUES(?, ?)", cubes_metadata_new)
|
||||
con.commit()
|
||||
|
||||
cur.execute("""
|
||||
SELECT a.product_id
|
||||
FROM downloaded AS a,
|
||||
cubes AS b
|
||||
WHERE a.product_id = b.product_id
|
||||
AND b.last_updated > a.last_updated
|
||||
""")
|
||||
results = cur.fetchall()
|
||||
for result in results:
|
||||
product_id = result[0]
|
||||
print(f"Updating product_id: {product_id}")
|
||||
download_cube(product_id)
|
||||
process_cube(product_id)
|
||||
|
||||
def compute_ref_date_bounds(df):
|
||||
"""
|
||||
TODO:
|
||||
- Completely rewrite this, AI helpmed me figure out this logic as I had
|
||||
no experience with dates
|
||||
"""
|
||||
"""
|
||||
- There are cases where the REF_DATE is a range, ex. 2023/2024.
|
||||
For productId 17100022 the period is from 2023-07-01 to 2024-06-30
|
||||
(seen in the metadata), so we cannot just use 2023-01-01 and 2024-12-31
|
||||
"""
|
||||
series = df["REF_DATE"]
|
||||
# Initialize the two new columns with NaT
|
||||
df["REF_START_DATE"] = pd.NaT
|
||||
df["REF_END_DATE"] = pd.NaT
|
||||
|
||||
# Skip rows that contain slashes
|
||||
valid_mask = ~series.str.contains("/", na=False)
|
||||
|
||||
# Case 1: YYYY-MM-DD
|
||||
full_mask = valid_mask & series.str.fullmatch(r"\d{4}-\d{2}-\d{2}")
|
||||
parsed_full = pd.to_datetime(series[full_mask], format="%Y-%m-%d", errors="coerce")
|
||||
df.loc[full_mask, "REF_START_DATE"] = parsed_full
|
||||
df.loc[full_mask, "REF_END_DATE"] = parsed_full
|
||||
|
||||
# Case 2: YYYY-MM
|
||||
month_mask = valid_mask & series.str.fullmatch(r"\d{4}-\d{2}")
|
||||
parsed_month = pd.to_datetime(series[month_mask], format="%Y-%m", errors="coerce")
|
||||
df.loc[month_mask, "REF_START_DATE"] = parsed_month
|
||||
df.loc[month_mask, "REF_END_DATE"] = parsed_month + pd.to_timedelta(
|
||||
parsed_month.dt.days_in_month - 1, unit='D'
|
||||
)
|
||||
|
||||
# Case 3: YYYY
|
||||
year_mask = valid_mask & series.str.fullmatch(r"\d{4}")
|
||||
parsed_year = pd.to_datetime(series[year_mask], format="%Y", errors="coerce")
|
||||
df.loc[year_mask, "REF_START_DATE"] = parsed_year
|
||||
df.loc[year_mask, "REF_END_DATE"] = parsed_year + pd.offsets.YearEnd(0)
|
||||
|
||||
# Move columns after REF_DATE
|
||||
ref_idx = df.columns.get_loc("REF_DATE")
|
||||
cols = list(df.columns)
|
||||
cols.remove("REF_START_DATE")
|
||||
cols.remove("REF_END_DATE")
|
||||
cols[ref_idx + 1:ref_idx + 1] = ["REF_START_DATE", "REF_END_DATE"]
|
||||
|
||||
return df[cols]
|
||||
|
||||
def convert_to_lowest_type(df):
|
||||
"""
|
||||
Convert columns to the best possible dtypes
|
||||
For example, if the column is numerical and has a maximum value of 32,000
|
||||
we can assign it a type of int16
|
||||
"""
|
||||
dtypes = pd.DataFrame(df.dtypes)
|
||||
# Downcast to the smallest numerical dtype
|
||||
for row in dtypes.itertuples():
|
||||
column = row[0]
|
||||
the_type = str(row[1])
|
||||
if the_type == 'Int64':
|
||||
df[column] = pd.to_numeric(df[column], downcast='integer')
|
||||
|
||||
return df
|
||||
|
||||
def extract_zipfile(product_id, language):
|
||||
"""
|
||||
It is faster to extract the zip file and read the CSV, than open
|
||||
via zipfile and then Pandas
|
||||
"""
|
||||
zip_file = f"{input_folder}/{language}/{product_id}.zip"
|
||||
with zipfile.ZipFile(zip_file) as myzip:
|
||||
print(f"Extracting {zip_file} to {scratch_folder}")
|
||||
myzip.extractall(path=scratch_folder)
|
||||
|
||||
def get_cube_metadata(product_id):
|
||||
url = f"https://www150.statcan.gc.ca/t1/wds/rest/getCubeMetadata"
|
||||
cubes_payload = [{"productId": product_id}]
|
||||
result = requests.post(url, json=cubes_payload)
|
||||
result = result.json()[0]
|
||||
return result
|
||||
|
||||
def download_cube(product_id, language="en"):
|
||||
"""
|
||||
Downloads the English CSV for a specific table
|
||||
"""
|
||||
download_url = f"https://www150.statcan.gc.ca/t1/wds/rest/getFullTableDownloadCSV/{product_id}/en"
|
||||
response = requests.get(download_url).json()
|
||||
zip_url = response['object']
|
||||
zip_file_name = f"{input_folder}/{language}/{product_id}.zip"
|
||||
print(f"Downloading {zip_url} to {zip_file_name}")
|
||||
response = requests.get(zip_url, stream=True, headers={"user-agent": None})
|
||||
progress_bar = tqdm(
|
||||
desc=zip_file_name,
|
||||
total=int(response.headers.get("content-length", 0)),
|
||||
unit="B",
|
||||
unit_scale=True
|
||||
)
|
||||
with open(zip_file_name, "wb") as handle:
|
||||
for chunk in response.iter_content(chunk_size=512):
|
||||
if chunk: # filter out keep-alive new chunks
|
||||
handle.write(chunk)
|
||||
progress_bar.update(len(chunk))
|
||||
progress_bar.close()
|
||||
|
||||
def cleanup_product(product_id):
|
||||
"""
|
||||
Remove the scratch files for a given productId
|
||||
"""
|
||||
print(f"Removing scratch files for productId {product_id}")
|
||||
os.remove(f"{scratch_folder}/{product_id}.csv")
|
||||
os.remove(f"{scratch_folder}/{product_id}_MetaData.csv")
|
||||
|
||||
def process_cube(product_id, language="en"):
|
||||
"""
|
||||
Examples:
|
||||
- productId 43100011 has all with DECIMAL = 1 (float64)
|
||||
- productId 17100009 has DECIMAL = 0 (int64)
|
||||
- productId 35100076 has multiple DECIMAL precisions [0, 1, 2] (int64, float64, float64)
|
||||
- The duplicate column issue just happens with two column names in all data products ("Value", "VALUE", and "Status", "STATUS")
|
||||
- productId 10100164 has two columns named the same "Value" and "VALUE". DuckDB treats column names in a case insensitve manner, so
|
||||
"Value" and "VALUE" are the same. So we will need to rename "Value" to "Value.1"
|
||||
- productId 13100902 has two columns named the same "Status" and "STATUS". We will need to rename "Status" to "STATUS"
|
||||
- productId 13100442 has 18 fields, but 19 fields were seen in line 162
|
||||
- There are cases where the "DECIMALS" column does not exist in the CSV. productId 98100001 is one example.
|
||||
In this case, we do let the .read_csv method guess the data types
|
||||
"""
|
||||
cur.execute("SELECT product_id FROM downloaded WHERE product_id = ?", (product_id,))
|
||||
result = cur.fetchone()
|
||||
if result:
|
||||
print(f"Already processed {product_id}")
|
||||
return
|
||||
extract_zipfile(product_id, language)
|
||||
"""
|
||||
The pandas column reader is better than the Polars one
|
||||
Here is an example where polars was not reading it right:
|
||||
https://www150.statcan.gc.ca/n1/tbl/csv/98100404-eng.zip
|
||||
"""
|
||||
# Get metadata
|
||||
#metadata_file = f"{input_folder}/metadata/{product_id}.json"
|
||||
#metadata = get_cube_metadata(product_id)
|
||||
#print(f"Writing metadata file {metadata_file}")
|
||||
#with open(metadata_file, "w") as outfile:
|
||||
# json.dump(metadata, outfile)
|
||||
# Read CSV using Pandas
|
||||
product_csv = f"{scratch_folder}/{product_id}.csv"
|
||||
print(f"Reading {product_csv}")
|
||||
parameters = {
|
||||
"filepath_or_buffer": product_csv,
|
||||
"engine": "c",
|
||||
#"nrows": 100000,
|
||||
"dtype": {}
|
||||
}
|
||||
columns = pd.read_csv(product_csv, nrows=0).columns
|
||||
columns_to_rename = ['Value', 'Status']
|
||||
for column in columns_to_rename:
|
||||
if column in columns:
|
||||
print(f"Renaming '{column}' to '{column}.1'")
|
||||
columns = [f'{column}.1' if x == column else x for x in columns]
|
||||
# Explicitly tell pandas to not read column names from CSV
|
||||
parameters["header"] = 0
|
||||
parameters["names"] = columns
|
||||
|
||||
columns_always_int_8 = ["DECIMALS", "SCALAR_ID"]
|
||||
for column in columns_always_int_8:
|
||||
if column in columns:
|
||||
parameters["dtype"][column] = 'int8'
|
||||
|
||||
columns_always_int_16 = ["UOM_ID"]
|
||||
for column in columns_always_int_16:
|
||||
if column in columns:
|
||||
parameters["dtype"][column] = 'int16'
|
||||
# REF_DATE, GEO, DGUID should always be string
|
||||
columns_always_string = ["REF_DATE", "GEO", "DGUID"]
|
||||
for column in columns_always_string:
|
||||
if column in columns:
|
||||
parameters["dtype"][column] = 'string'
|
||||
|
||||
# The remaining columns should be string, with the exception of VALUE
|
||||
# Added "DECIMAL" check as there can be numeric columns that are not the VALUE column
|
||||
if "DECIMALS" in columns:
|
||||
for column in columns:
|
||||
if column not in columns_always_int_8 and column not in columns_always_int_16 and column != "VALUE":
|
||||
parameters["dtype"][column] = 'string'
|
||||
|
||||
if not parameters["dtype"]:
|
||||
del parameters["dtype"]
|
||||
|
||||
print(f"Reading {product_csv} as a Pandas dataframe")
|
||||
try:
|
||||
df = pd.read_csv(**parameters)
|
||||
except Exception:
|
||||
logging.error(f"Failed to process productId: {product_id}")
|
||||
cleanup_product(product_id)
|
||||
return
|
||||
|
||||
if "DECIMALS" in columns:
|
||||
unique_decimal_values = df["DECIMALS"].unique()
|
||||
#print(unique_decimal_values)
|
||||
if any(unique_decimal_values):
|
||||
"""
|
||||
A table can have both float and integer in the VALUE field.
|
||||
productId 11100025 is an example
|
||||
So if we have unique values for DECIMALS to be [0,1], then we convert to float64
|
||||
"""
|
||||
convert_dict = {"VALUE": "float64"}
|
||||
#print(convert_dict)
|
||||
df = df.astype(convert_dict)
|
||||
elif 0 in (unique_decimal_values):
|
||||
if df["VALUE"].dtype != "Int64":
|
||||
# If DECIMALS = [0]
|
||||
convert_dict = {"VALUE": "Int64"}
|
||||
#print(convert_dict)
|
||||
df = df.astype(convert_dict)
|
||||
else:
|
||||
parameters = {
|
||||
"convert_string": True,
|
||||
"convert_boolean": False
|
||||
}
|
||||
#print("DECIMALS not in columns, using .convert_dtypes")
|
||||
df = df.convert_dtypes(**parameters)
|
||||
|
||||
df = convert_to_lowest_type(df)
|
||||
df = compute_ref_date_bounds(df)
|
||||
output_parquet = f"{output_folder}/{language}/{product_id}.parquet"
|
||||
print(f"Exporting dataframe as parquet to {output_parquet}")
|
||||
parameters = {
|
||||
"path": output_parquet,
|
||||
"engine": "pyarrow",
|
||||
"compression": "zstd",
|
||||
"index": False,
|
||||
"compression_level": 22
|
||||
}
|
||||
df.to_parquet(**parameters)
|
||||
# Remove scratch files
|
||||
cleanup_product(product_id)
|
||||
update_last_downloaded(product_id)
|
||||
update_last_processed(product_id)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
setup()
|
||||
files_to_process = glob.glob(f"{input_folder}/en/*.zip")
|
||||
# Get the product_id
|
||||
files_to_process = [x.split("/")[-1].split(".zip")[0] for x in files_to_process]
|
||||
to_process = len(files_to_process)
|
||||
print(f"Processing {to_process}")
|
||||
with Pool(processes=2) as p:
|
||||
p.map(process_cube, files_to_process, chunksize=1)
|
||||
Reference in New Issue
Block a user