basic place geometry creation from initial query

This commit is contained in:
Martha
2025-12-04 11:55:40 +00:00
parent 1a11473421
commit 61cce53e4c
2 changed files with 286 additions and 210 deletions
+1 -4
View File
@@ -1,10 +1,7 @@
from langchain.agents import AgentState as BaseAgentState from langchain.agents import AgentState as BaseAgentState
from geojson_pydantic import FeatureCollection from geojson_pydantic import FeatureCollection
from typing import Optional from typing import Optional
from pydantic import Field
class AgentState(BaseAgentState): class AgentState(BaseAgentState):
feature_collection: Optional[FeatureCollection] = Field( place: Optional[FeatureCollection]
default=None, description="FeatureCollection to be used for the analysis"
)
+285 -206
View File
@@ -1,248 +1,327 @@
from typing import Optional, Annotated from typing import Annotated
import duckdb import duckdb
from geojson_pydantic import Feature import json
from shapely import wkt
from shapely.geometry import mapping
from langchain_core.tools import tool from langchain_core.tools import tool
from langgraph.types import Command from langgraph.types import Command
from langchain_core.messages import ToolMessage from langchain_core.messages import ToolMessage
from langchain_core.tools.base import InjectedToolCallId from langchain_core.tools.base import InjectedToolCallId
def create_database_connection():
"""Create and configure a DuckDB connection with necessary extensions.
Args:
database_path: Path to the DuckDB database file
Returns:
Configured DuckDB connection
"""
connection = duckdb.connect()
connection.execute("INSTALL spatial;")
connection.execute("INSTALL httpfs;")
connection.load_extension("spatial")
connection.load_extension("httpfs")
return connection
@tool @tool
def get_overture_locations( def get_place(
area_of_interest: Feature, place_name: str, tool_call_id: Annotated[str, InjectedToolCallId] = ""
place_name: Optional[str] = None,
place_type: Optional[str] = None,
overture_release: str = "2024-11-13.0",
similarity_threshold: float = 0.6,
tool_call_id: Annotated[str, InjectedToolCallId] = "",
) -> Command: ) -> Command:
""" """Get place location from Overture Maps based on user input place name."""
Get locations from Overture Maps.
Parameters db_connection = create_database_connection()
----------
area_of_interest : Feature
Area of interest to search for locations in
place_name : str, optional
Name of the place to search for
place_type : str, optional
Type of the place to search for
overture_release : str
Overture Maps release version
similarity_threshold : float
Minimum similarity score (0-1) for fuzzy name matching
tool_call_id : str
Tool call ID
Returns location_results = db_connection.execute(
------- f"""
Command LOAD spatial;
Command that updates state with location features
"""
con = duckdb.connect() SET s3_region='us-west-2';
con.execute("INSTALL spatial;") SELECT
con.execute("LOAD spatial;") id,
jaro_winkler_similarity(LOWER(names.primary), LOWER('{place_name}')) AS similarity_score,
names.primary AS name,
confidence,
CAST(socials AS JSON) AS socials,
ST_AsGeoJSON(geometry) AS geometry,
FROM read_parquet(
's3://overturemaps-us-west-2/release/2025-11-19.0/theme=places/type=place/*',
filename=true,
hive_partitioning=1
)
WHERE jaro_winkler_similarity(LOWER(names.primary), LOWER('{place_name}')) > 0.5
ORDER BY similarity_score DESC
LIMIT 1;
"""
).fetchall()
con.execute("INSTALL httpfs;") db_connection.close()
con.execute("LOAD httpfs;")
con.execute( geometry = json.loads(location_results[0][-1])
"""
CREATE OR REPLACE TABLE aoi AS
SELECT ST_GeomFromGeoJSON(?) AS geom
""",
[area_of_interest.geometry.model_dump_json()],
)
base_url = f"s3://overturemaps-us-west-2/release/{overture_release}/theme=places/type=place/*" # Create FeatureCollection
feature_collection = {
where_conditions = ["ST_Within(ST_GeomFromWKB(geometry), (SELECT geom FROM aoi))"] "type": "FeatureCollection",
"features": [
if place_type: {
where_conditions.append(f"categories.primary = '{place_type}'") "type": "Feature",
"geometry": geometry,
if place_name: "properties": {
where_conditions.append( "name": location_results[0][2],
f"jaro_winkler_similarity(LOWER(names.primary), LOWER('{place_name}')) >= {similarity_threshold}" "overture_id": location_results[0][0],
) },
where_clause = " AND ".join(where_conditions)
query = f"""
SELECT
id,
ST_AsText(ST_GeomFromWKB(geometry)) as geometry_wkt,
names.primary as name,
categories.primary as primary_category,
confidence,
websites,
phones,
addresses
FROM read_parquet('{base_url}', filename=true, hive_partitioning=1)
WHERE {where_clause}
"""
result = con.execute(query).fetchall()
columns = [desc[0] for desc in con.description]
locations = [dict(zip(columns, row)) for row in result]
# Convert locations to GeoJSON Features
features = []
for loc in locations:
# Parse WKT geometry to GeoJSON
geom_wkt = loc.get("geometry_wkt")
if geom_wkt:
shapely_geom = wkt.loads(geom_wkt)
geom_dict = mapping(shapely_geom)
# Create properties from location data
properties = {
"id": loc.get("id"),
"name": loc.get("name"),
"primary_category": loc.get("primary_category"),
"confidence": loc.get("confidence"),
"websites": loc.get("websites"),
"phones": loc.get("phones"),
"addresses": loc.get("addresses"),
} }
],
feature = Feature(geometry=geom_dict, properties=properties) }
features.append(feature)
con.close()
tool_message = f"Found {len(features)} locations matching the criteria"
return Command( return Command(
update={ update={
"features": features, "place": feature_collection,
"messages": [ToolMessage(content=tool_message, tool_call_id=tool_call_id)], "messages": [
ToolMessage(
content=f"Found place with Overture name: {location_results[0][2]} based on user query",
tool_call_id=tool_call_id,
)
],
}, },
) )
@tool # @tool
def geocode_division( # def get_overture_locations(
query: str, # area_of_interest: Feature,
level: Optional[str] = None, # place_name: Optional[str] = None,
overture_release: str = "2024-11-13.0", # place_type: Optional[str] = None,
similarity_threshold: float = 0.6, # overture_release: str = "2024-11-13.0",
limit: int = 10, # similarity_threshold: float = 0.6,
tool_call_id: Annotated[str, InjectedToolCallId] = "", # tool_call_id: Annotated[str, InjectedToolCallId] = "",
) -> Command: # ) -> Command:
""" # """
Geocode a place name using Overture divisions data. # Get locations from Overture Maps.
Parameters # Parameters
---------- # ----------
query : str # area_of_interest : Feature
Place name to search for (e.g., "San Francisco", "California", "United States") # Area of interest to search for locations in
level : str, optional # place_name : str, optional
Division level to filter by. Options: # Name of the place to search for
- 'country' # place_type : str, optional
- 'region' (states, provinces) # Type of the place to search for
- 'county' (counties, districts) # overture_release : str
- 'locality' (cities, towns) # Overture Maps release version
- 'localadmin' (local administrative areas) # similarity_threshold : float
- 'neighborhood' # Minimum similarity score (0-1) for fuzzy name matching
overture_release : str # tool_call_id : str
Overture Maps release version # Tool call ID
similarity_threshold : float
Minimum similarity score (0-1) for fuzzy name matching
limit : int
Maximum number of results to return
Returns # Returns
------- # -------
Command # Command
Command that updates state with division features # Command that updates state with location features
""" # """
con = duckdb.connect() # con = duckdb.connect()
con.execute("INSTALL spatial;") # con.execute("INSTALL spatial;")
con.execute("LOAD spatial;") # con.execute("LOAD spatial;")
con.execute("INSTALL httpfs;") # con.execute("INSTALL httpfs;")
con.execute("LOAD httpfs;") # con.execute("LOAD httpfs;")
base_url = f"s3://overturemaps-us-west-2/release/{overture_release}/theme=divisions/type=division/*" # con.execute(
# """
# CREATE OR REPLACE TABLE aoi AS
# SELECT ST_GeomFromGeoJSON(?) AS geom
# """,
# [area_of_interest.geometry.model_dump_json()],
# )
where_conditions = [ # base_url = f"s3://overturemaps-us-west-2/release/{overture_release}/theme=places/type=place/*"
f"jaro_winkler_similarity(LOWER(names.primary), LOWER('{query}')) >= {similarity_threshold}"
]
if level: # where_conditions = ["ST_Within(ST_GeomFromWKB(geometry), (SELECT geom FROM aoi))"]
where_conditions.append(f"subtype = '{level}'")
where_clause = " AND ".join(where_conditions) # if place_type:
# where_conditions.append(f"categories.primary = '{place_type}'")
query_sql = f""" # if place_name:
SELECT # where_conditions.append(
id, # f"jaro_winkler_similarity(LOWER(names.primary), LOWER('{place_name}')) >= {similarity_threshold}"
ST_AsText(ST_GeomFromWKB(geometry)) as geometry_wkt, # )
names.primary as name,
names.common as common_names,
subtype as division_level,
country,
region,
hierarchies,
population,
capital,
wikidata,
sources,
jaro_winkler_similarity(LOWER(names.primary), LOWER('{query}')) as similarity_score
FROM read_parquet('{base_url}', filename=true, hive_partitioning=1)
WHERE {where_clause}
ORDER BY similarity_score DESC
LIMIT {limit}
"""
result = con.execute(query_sql).fetchall() # where_clause = " AND ".join(where_conditions)
columns = [desc[0] for desc in con.description]
divisions = [dict(zip(columns, row)) for row in result] # query = f"""
# SELECT
# id,
# ST_AsText(ST_GeomFromWKB(geometry)) as geometry_wkt,
# names.primary as name,
# categories.primary as primary_category,
# confidence,
# websites,
# phones,
# addresses
# FROM read_parquet('{base_url}', filename=true, hive_partitioning=1)
# WHERE {where_clause}
# """
# Convert divisions to GeoJSON Features # result = con.execute(query).fetchall()
features = [] # columns = [desc[0] for desc in con.description]
for div in divisions:
# Parse WKT geometry to GeoJSON
geom_wkt = div.get("geometry_wkt")
if geom_wkt:
shapely_geom = wkt.loads(geom_wkt)
geom_dict = mapping(shapely_geom)
# Create properties from division data # locations = [dict(zip(columns, row)) for row in result]
properties = {
"id": div.get("id"),
"name": div.get("name"),
"common_names": div.get("common_names"),
"division_level": div.get("division_level"),
"country": div.get("country"),
"region": div.get("region"),
"hierarchies": div.get("hierarchies"),
"population": div.get("population"),
"capital": div.get("capital"),
"wikidata": div.get("wikidata"),
"sources": div.get("sources"),
"similarity_score": div.get("similarity_score"),
}
feature = Feature(geometry=geom_dict, properties=properties) # # Convert locations to GeoJSON Features
features.append(feature) # features = []
# for loc in locations:
# # Parse WKT geometry to GeoJSON
# geom_wkt = loc.get("geometry_wkt")
# if geom_wkt:
# shapely_geom = wkt.loads(geom_wkt)
# geom_dict = mapping(shapely_geom)
con.close() # # Create properties from location data
# properties = {
# "id": loc.get("id"),
# "name": loc.get("name"),
# "primary_category": loc.get("primary_category"),
# "confidence": loc.get("confidence"),
# "websites": loc.get("websites"),
# "phones": loc.get("phones"),
# "addresses": loc.get("addresses"),
# }
tool_message = f"Found {len(features)} divisions matching '{query}'" # feature = Feature(geometry=geom_dict, properties=properties)
# features.append(feature)
return Command( # con.close()
update={
"features": features, # tool_message = f"Found {len(features)} locations matching the criteria"
"messages": [ToolMessage(content=tool_message, tool_call_id=tool_call_id)],
}, # return Command(
) # update={
# "features": features,
# "messages": [ToolMessage(content=tool_message, tool_call_id=tool_call_id)],
# },
# )
# @tool
# def geocode_division(
# query: str,
# level: Optional[str] = None,
# overture_release: str = "2024-11-13.0",
# similarity_threshold: float = 0.6,
# limit: int = 10,
# tool_call_id: Annotated[str, InjectedToolCallId] = "",
# ) -> Command:
# """
# Geocode a place name using Overture divisions data.
# Parameters
# ----------
# query : str
# Place name to search for (e.g., "San Francisco", "California", "United States")
# level : str, optional
# Division level to filter by. Options:
# - 'country'
# - 'region' (states, provinces)
# - 'county' (counties, districts)
# - 'locality' (cities, towns)
# - 'localadmin' (local administrative areas)
# - 'neighborhood'
# overture_release : str
# Overture Maps release version
# similarity_threshold : float
# Minimum similarity score (0-1) for fuzzy name matching
# limit : int
# Maximum number of results to return
# Returns
# -------
# Command
# Command that updates state with division features
# """
# con = duckdb.connect()
# con.execute("INSTALL spatial;")
# con.execute("LOAD spatial;")
# con.execute("INSTALL httpfs;")
# con.execute("LOAD httpfs;")
# base_url = f"s3://overturemaps-us-west-2/release/{overture_release}/theme=divisions/type=division/*"
# where_conditions = [
# f"jaro_winkler_similarity(LOWER(names.primary), LOWER('{query}')) >= {similarity_threshold}"
# ]
# if level:
# where_conditions.append(f"subtype = '{level}'")
# where_clause = " AND ".join(where_conditions)
# query_sql = f"""
# SELECT
# id,
# ST_AsText(ST_GeomFromWKB(geometry)) as geometry_wkt,
# names.primary as name,
# names.common as common_names,
# subtype as division_level,
# country,
# region,
# hierarchies,
# population,
# capital,
# wikidata,
# sources,
# jaro_winkler_similarity(LOWER(names.primary), LOWER('{query}')) as similarity_score
# FROM read_parquet('{base_url}', filename=true, hive_partitioning=1)
# WHERE {where_clause}
# ORDER BY similarity_score DESC
# LIMIT {limit}
# """
# result = con.execute(query_sql).fetchall()
# columns = [desc[0] for desc in con.description]
# divisions = [dict(zip(columns, row)) for row in result]
# # Convert divisions to GeoJSON Features
# features = []
# for div in divisions:
# # Parse WKT geometry to GeoJSON
# geom_wkt = div.get("geometry_wkt")
# if geom_wkt:
# shapely_geom = wkt.loads(geom_wkt)
# geom_dict = mapping(shapely_geom)
# # Create properties from division data
# properties = {
# "id": div.get("id"),
# "name": div.get("name"),
# "common_names": div.get("common_names"),
# "division_level": div.get("division_level"),
# "country": div.get("country"),
# "region": div.get("region"),
# "hierarchies": div.get("hierarchies"),
# "population": div.get("population"),
# "capital": div.get("capital"),
# "wikidata": div.get("wikidata"),
# "sources": div.get("sources"),
# "similarity_score": div.get("similarity_score"),
# }
# feature = Feature(geometry=geom_dict, properties=properties)
# features.append(feature)
# con.close()
# tool_message = f"Found {len(features)} divisions matching '{query}'"
# return Command(
# update={
# "features": features,
# "messages": [ToolMessage(content=tool_message, tool_call_id=tool_call_id)],
# },
# )