Made changes

This commit is contained in:
Diego Ripley
2025-06-26 13:35:37 +00:00
parent 4ed5fb4bbb
commit a55e1d325d
6 changed files with 2676 additions and 402 deletions
File diff suppressed because one or more lines are too long
@@ -2,7 +2,7 @@
"cells": [ "cells": [
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 7, "execution_count": 1,
"id": "b410f6f2-2ce8-4eaa-9c25-03e076a9d996", "id": "b410f6f2-2ce8-4eaa-9c25-03e076a9d996",
"metadata": { "metadata": {
"editable": true, "editable": true,
@@ -40,7 +40,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 25, "execution_count": 3,
"id": "0c7532d5-c8dc-4b93-900b-0e43d0718afd", "id": "0c7532d5-c8dc-4b93-900b-0e43d0718afd",
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
@@ -50,7 +50,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 30, "execution_count": 4,
"id": "da44a381-759b-443b-9440-c81c6cae3108", "id": "da44a381-759b-443b-9440-c81c6cae3108",
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
@@ -66,7 +66,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 31, "execution_count": 5,
"id": "16409ae7-e62e-4d32-a1f2-42300bbeb80f", "id": "16409ae7-e62e-4d32-a1f2-42300bbeb80f",
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
@@ -80,7 +80,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 32, "execution_count": 6,
"id": "f228535d-7645-447a-ad80-0367d72102a2", "id": "f228535d-7645-447a-ad80-0367d72102a2",
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
@@ -90,7 +90,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 33, "execution_count": 7,
"id": "08d51b9e-08f6-4f4c-b0e9-22142987bdf3", "id": "08d51b9e-08f6-4f4c-b0e9-22142987bdf3",
"metadata": { "metadata": {
"editable": true, "editable": true,
@@ -103,7 +103,7 @@
{ {
"data": { "data": {
"application/vnd.jupyter.widget-view+json": { "application/vnd.jupyter.widget-view+json": {
"model_id": "7e162d24346c48dcbfc550b76cdbdd93", "model_id": "340c8299e87348a2998c9c62a071decf",
"version_major": 2, "version_major": 2,
"version_minor": 1 "version_minor": 1
}, },
@@ -3,7 +3,13 @@
{ {
"cell_type": "markdown", "cell_type": "markdown",
"id": "e4d2c52d-38be-4f84-a0bf-6bd8cb577ad9", "id": "e4d2c52d-38be-4f84-a0bf-6bd8cb577ad9",
"metadata": {}, "metadata": {
"editable": true,
"slideshow": {
"slide_type": ""
},
"tags": []
},
"source": [ "source": [
"# Purpose\n", "# Purpose\n",
"I need to find out what all possible date formats are for the \"REF_DATE\" field so that when I write the parquet file people will be able to filter on it\n", "I need to find out what all possible date formats are for the \"REF_DATE\" field so that when I write the parquet file people will be able to filter on it\n",
File diff suppressed because it is too large Load Diff
@@ -2,12 +2,14 @@ from datetime import datetime
import glob import glob
from multiprocessing import Pool from multiprocessing import Pool
import json import json
import logging
import os import os
import sqlite3 import sqlite3
import zipfile import zipfile
from zoneinfo import ZoneInfo from zoneinfo import ZoneInfo
import pandas as pd import pandas as pd
import polars as pl
import requests import requests
from tqdm import tqdm from tqdm import tqdm
@@ -16,8 +18,18 @@ input_folder = f"{data_folder}/input"
scratch_folder = f"{data_folder}/scratch" scratch_folder = f"{data_folder}/scratch"
output_folder = f"{data_folder}/output" output_folder = f"{data_folder}/output"
import logging
logging.basicConfig(
filename="processing.log",
encoding="utf-8",
filemode="a",
format="{asctime} - {levelname} - {message}",
style="{",
datefmt="%Y-%m-%d %H:%M",
)
if not os.path.exists(f"{data_folder}/processing.db"): if not os.path.exists(f"{data_folder}/processing.db"):
con = sqlite3.connect(f"{data_folder}/processing.db") con = sqlite3.connect(f"{data_folder}/processing.db", timeout=60.0)
cur = con.cursor() cur = con.cursor()
cur.executescript(""" cur.executescript("""
CREATE TABLE IF NOT EXISTS downloaded ( CREATE TABLE IF NOT EXISTS downloaded (
@@ -33,7 +45,7 @@ if not os.path.exists(f"{data_folder}/processing.db"):
""") """)
con.commit() con.commit()
else: else:
con = sqlite3.connect(f"{data_folder}/processing.db") con = sqlite3.connect(f"{data_folder}/processing.db", timeout=60.0)
cur = con.cursor() cur = con.cursor()
def setup(): def setup():
@@ -223,14 +235,27 @@ def download_cube(product_id, language="en"):
progress_bar.update(len(chunk)) progress_bar.update(len(chunk))
progress_bar.close() progress_bar.close()
def cleanup_product(product_id):
"""
Remove the scratch files for a given productId
"""
print(f"Removing scratch files for productId {product_id}")
os.remove(f"{scratch_folder}/{product_id}.csv")
os.remove(f"{scratch_folder}/{product_id}_MetaData.csv")
def process_cube(product_id, language="en"): def process_cube(product_id, language="en"):
""" """
Examples: Examples:
- productId 43100011 has all with DECIMAL = 1 (float64) - productId 43100011 has all with DECIMAL = 1 (float64)
- productId 17100009 has DECIMAL = 0 (int64) - productId 17100009 has DECIMAL = 0 (int64)
- productId 35100076 has multiple DECIMAL precisions [0, 1, 2] (int64, float64, float64) - productId 35100076 has multiple DECIMAL precisions [0, 1, 2] (int64, float64, float64)
- productId 10100164 has two columns named the same "Value" and "VALUE". It is processed fine with the read_csv, and when it is exported as parquet. - The duplicate column issue just happens with two column names in all data products ("Value", "VALUE", and "Status", "STATUS")
DuckDB has an issue with it (as it is case insensitive), but Pandas and Polars are able to handle "Value" and "VALUE" - productId 10100164 has two columns named the same "Value" and "VALUE". DuckDB treats column names in a case insensitve manner, so
"Value" and "VALUE" are the same. So we will need to rename "Value" to "Value.1"
- productId 13100902 has two columns named the same "Status" and "STATUS". We will need to rename "Status" to "STATUS"
- productId 13100442 has 18 fields, but 19 fields were seen in line 162
- There are cases where the "DECIMALS" column does not exist in the CSV. productId 98100001 is one example.
In this case, we do let the .read_csv method guess the data types
""" """
cur.execute("SELECT product_id FROM downloaded WHERE product_id = ?", (product_id,)) cur.execute("SELECT product_id FROM downloaded WHERE product_id = ?", (product_id,))
result = cur.fetchone() result = cur.fetchone()
@@ -253,13 +278,22 @@ def process_cube(product_id, language="en"):
product_csv = f"{scratch_folder}/{product_id}.csv" product_csv = f"{scratch_folder}/{product_id}.csv"
print(f"Reading {product_csv}") print(f"Reading {product_csv}")
parameters = { parameters = {
"filepath_or_buffer": product_csv,
"engine": "c", "engine": "c",
"low_memory": True, #"nrows": 100000,
"nrows": 100000,
"dtype": {} "dtype": {}
} }
columns = pd.read_csv(product_csv, nrows=0).columns columns = pd.read_csv(product_csv, nrows=0).columns
columns_to_rename = ['Value', 'Status']
for column in columns_to_rename:
if column in columns:
print(f"Renaming '{column}' to '{column}.1'")
columns = [f'{column}.1' if x == column else x for x in columns]
# Explicitly tell pandas to not read column names from CSV
parameters["header"] = 0
parameters["names"] = columns
columns_always_int_8 = ["DECIMALS", "SCALAR_ID"] columns_always_int_8 = ["DECIMALS", "SCALAR_ID"]
for column in columns_always_int_8: for column in columns_always_int_8:
if column in columns: if column in columns:
@@ -270,7 +304,15 @@ def process_cube(product_id, language="en"):
if column in columns: if column in columns:
parameters["dtype"][column] = 'int16' parameters["dtype"][column] = 'int16'
# REF_DATE, GEO, DGUID should always be string
columns_always_string = ["REF_DATE", "GEO", "DGUID"]
for column in columns_always_string:
if column in columns:
parameters["dtype"][column] = 'string'
# The remaining columns should be string, with the exception of VALUE # The remaining columns should be string, with the exception of VALUE
# Added "DECIMAL" check as there can be numeric columns that are not the VALUE column
if "DECIMALS" in columns:
for column in columns: for column in columns:
if column not in columns_always_int_8 and column not in columns_always_int_16 and column != "VALUE": if column not in columns_always_int_8 and column not in columns_always_int_16 and column != "VALUE":
parameters["dtype"][column] = 'string' parameters["dtype"][column] = 'string'
@@ -279,8 +321,16 @@ def process_cube(product_id, language="en"):
del parameters["dtype"] del parameters["dtype"]
print(f"Reading {product_csv} as a Pandas dataframe") print(f"Reading {product_csv} as a Pandas dataframe")
df = pd.read_csv(product_csv, **parameters) try:
df = pd.read_csv(**parameters)
except Exception:
logging.error(f"Failed to process productId: {product_id}")
cleanup_product(product_id)
return
if "DECIMALS" in columns:
unique_decimal_values = df["DECIMALS"].unique() unique_decimal_values = df["DECIMALS"].unique()
#print(unique_decimal_values)
if any(unique_decimal_values): if any(unique_decimal_values):
""" """
A table can have both float and integer in the VALUE field. A table can have both float and integer in the VALUE field.
@@ -288,14 +338,21 @@ def process_cube(product_id, language="en"):
So if we have unique values for DECIMALS to be [0,1], then we convert to float64 So if we have unique values for DECIMALS to be [0,1], then we convert to float64
""" """
convert_dict = {"VALUE": "float64"} convert_dict = {"VALUE": "float64"}
print(convert_dict) #print(convert_dict)
df = df.astype(convert_dict) df = df.astype(convert_dict)
elif 0 in (unique_decimal_values): elif 0 in (unique_decimal_values):
if df["VALUE"].dtype != "Int64": if df["VALUE"].dtype != "Int64":
# If DECIMALS = [0] # If DECIMALS = [0]
convert_dict = {"VALUE": "Int64"} convert_dict = {"VALUE": "Int64"}
print(convert_dict) #print(convert_dict)
df = df.astype(convert_dict) df = df.astype(convert_dict)
else:
parameters = {
"convert_string": True,
"convert_boolean": False
}
#print("DECIMALS not in columns, using .convert_dtypes")
df = df.convert_dtypes(**parameters)
df = convert_to_lowest_type(df) df = convert_to_lowest_type(df)
df = compute_ref_date_bounds(df) df = compute_ref_date_bounds(df)
@@ -309,10 +366,8 @@ def process_cube(product_id, language="en"):
"compression_level": 22 "compression_level": 22
} }
df.to_parquet(**parameters) df.to_parquet(**parameters)
# Remove the scratch files # Remove scratch files
print("Removing scratch files") cleanup_product(product_id)
os.remove(f"{scratch_folder}/{product_id}.csv")
os.remove(f"{scratch_folder}/{product_id}_MetaData.csv")
update_last_downloaded(product_id) update_last_downloaded(product_id)
update_last_processed(product_id) update_last_processed(product_id)
@@ -326,5 +381,5 @@ if __name__ == '__main__':
print(f"Processing {to_process}") print(f"Processing {to_process}")
#for product_id in files_to_process: #for product_id in files_to_process:
# process_cube(product_id) # process_cube(product_id)
with Pool(processes=16) as p: with Pool(processes=2) as p:
p.map(process_cube, files_to_process, chunksize=8) p.map(process_cube, files_to_process, chunksize=1)
File diff suppressed because it is too large Load Diff