feat: Implement Phase 3 neighbourhood data model

Add schemas, parsers, loaders, and models for Toronto neighbourhood-centric
data including census profiles, crime statistics, and amenities.

Schemas:
- NeighbourhoodRecord, CensusRecord, CrimeRecord, CrimeType
- AmenityType, AmenityRecord, AmenityCount

Models:
- BridgeCMHCNeighbourhood (zone-to-neighbourhood mapping with weights)
- FactCensus, FactCrime, FactAmenities

Parsers:
- TorontoOpenDataParser (CKAN API for neighbourhoods, census, amenities)
- TorontoPoliceParser (crime rates, MCI data)

Loaders:
- load_census_data, load_crime_data, load_amenities
- build_cmhc_neighbourhood_crosswalk (PostGIS area weights)

Also updates CLAUDE.md with projman plugin workflow documentation.

Closes #53, #54, #55, #56, #57, #58, #59

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-16 11:07:13 -05:00
parent f69d0c15a7
commit 053acf6436
14 changed files with 1466 additions and 2 deletions

View File

@@ -6,6 +6,8 @@ from .geo import (
NeighbourhoodParser,
load_geojson,
)
from .toronto_open_data import TorontoOpenDataParser
from .toronto_police import TorontoPoliceParser
__all__ = [
"CMHCParser",
@@ -13,4 +15,7 @@ __all__ = [
"CMHCZoneParser",
"NeighbourhoodParser",
"load_geojson",
# API parsers (Phase 3)
"TorontoOpenDataParser",
"TorontoPoliceParser",
]

View File

@@ -0,0 +1,391 @@
"""Parser for Toronto Open Data CKAN API.
Fetches neighbourhood boundaries, census profiles, and amenities data
from the City of Toronto's Open Data Portal.
API Documentation: https://open.toronto.ca/dataset/
"""
import json
import logging
from decimal import Decimal
from pathlib import Path
from typing import Any
import httpx
from portfolio_app.toronto.schemas import (
AmenityRecord,
AmenityType,
CensusRecord,
NeighbourhoodRecord,
)
logger = logging.getLogger(__name__)
class TorontoOpenDataParser:
"""Parser for Toronto Open Data CKAN API.
Provides methods to fetch and parse neighbourhood boundaries, census profiles,
and amenities (parks, schools, childcare) from the Toronto Open Data portal.
"""
BASE_URL = "https://ckan0.cf.opendata.inter.prod-toronto.ca"
API_PATH = "/api/3/action"
# Dataset package IDs
DATASETS = {
"neighbourhoods": "neighbourhoods",
"neighbourhood_profiles": "neighbourhood-profiles",
"parks": "parks",
"schools": "school-locations-all-types",
"childcare": "licensed-child-care-centres",
}
def __init__(
self,
cache_dir: Path | None = None,
timeout: float = 30.0,
) -> None:
"""Initialize parser.
Args:
cache_dir: Optional directory for caching API responses.
timeout: HTTP request timeout in seconds.
"""
self._cache_dir = cache_dir
self._timeout = timeout
self._client: httpx.Client | None = None
@property
def client(self) -> httpx.Client:
"""Lazy-initialize HTTP client."""
if self._client is None:
self._client = httpx.Client(
base_url=self.BASE_URL,
timeout=self._timeout,
headers={"Accept": "application/json"},
)
return self._client
def close(self) -> None:
"""Close HTTP client."""
if self._client is not None:
self._client.close()
self._client = None
def __enter__(self) -> "TorontoOpenDataParser":
return self
def __exit__(self, *args: Any) -> None:
self.close()
def _get_package(self, package_id: str) -> dict[str, Any]:
"""Fetch package metadata from CKAN.
Args:
package_id: The package/dataset ID.
Returns:
Package metadata dictionary.
"""
response = self.client.get(
f"{self.API_PATH}/package_show",
params={"id": package_id},
)
response.raise_for_status()
result = response.json()
if not result.get("success"):
raise ValueError(f"CKAN API error: {result.get('error', 'Unknown error')}")
return dict(result["result"])
def _get_resource_url(
self,
package_id: str,
format_filter: str = "geojson",
) -> str:
"""Get the download URL for a resource in a package.
Args:
package_id: The package/dataset ID.
format_filter: Resource format to filter by (e.g., 'geojson', 'csv').
Returns:
Resource download URL.
Raises:
ValueError: If no matching resource is found.
"""
package = self._get_package(package_id)
resources = package.get("resources", [])
for resource in resources:
resource_format = resource.get("format", "").lower()
if format_filter.lower() in resource_format:
return str(resource["url"])
available = [r.get("format") for r in resources]
raise ValueError(
f"No {format_filter} resource in {package_id}. Available: {available}"
)
def _fetch_geojson(self, package_id: str) -> dict[str, Any]:
"""Fetch GeoJSON data from a package.
Args:
package_id: The package/dataset ID.
Returns:
GeoJSON FeatureCollection.
"""
# Check cache first
if self._cache_dir:
cache_file = self._cache_dir / f"{package_id}.geojson"
if cache_file.exists():
logger.debug(f"Loading {package_id} from cache")
with open(cache_file, encoding="utf-8") as f:
return dict(json.load(f))
url = self._get_resource_url(package_id, format_filter="geojson")
logger.info(f"Fetching GeoJSON from {url}")
response = self.client.get(url)
response.raise_for_status()
data = response.json()
# Cache the response
if self._cache_dir:
self._cache_dir.mkdir(parents=True, exist_ok=True)
cache_file = self._cache_dir / f"{package_id}.geojson"
with open(cache_file, "w", encoding="utf-8") as f:
json.dump(data, f)
return dict(data)
def _fetch_csv_as_json(self, package_id: str) -> list[dict[str, Any]]:
"""Fetch CSV data as JSON records via CKAN datastore.
Args:
package_id: The package/dataset ID.
Returns:
List of records as dictionaries.
"""
package = self._get_package(package_id)
resources = package.get("resources", [])
# Find a datastore-enabled resource
for resource in resources:
if resource.get("datastore_active"):
resource_id = resource["id"]
break
else:
raise ValueError(f"No datastore resource in {package_id}")
# Fetch all records via datastore_search
records: list[dict[str, Any]] = []
offset = 0
limit = 1000
while True:
response = self.client.get(
f"{self.API_PATH}/datastore_search",
params={"id": resource_id, "limit": limit, "offset": offset},
)
response.raise_for_status()
result = response.json()
if not result.get("success"):
raise ValueError(f"Datastore error: {result.get('error')}")
batch = result["result"]["records"]
records.extend(batch)
if len(batch) < limit:
break
offset += limit
return records
def get_neighbourhoods(self) -> list[NeighbourhoodRecord]:
"""Fetch 158 Toronto neighbourhood boundaries.
Returns:
List of validated NeighbourhoodRecord objects.
"""
geojson = self._fetch_geojson(self.DATASETS["neighbourhoods"])
features = geojson.get("features", [])
records = []
for feature in features:
props = feature.get("properties", {})
geometry = feature.get("geometry")
# Extract area_id from various possible property names
area_id = props.get("AREA_ID") or props.get("area_id")
if area_id is None:
# Try AREA_SHORT_CODE as fallback
short_code = props.get("AREA_SHORT_CODE", "")
if short_code:
# Extract numeric part
area_id = int("".join(c for c in short_code if c.isdigit()) or "0")
area_name = (
props.get("AREA_NAME")
or props.get("area_name")
or f"Neighbourhood {area_id}"
)
area_short_code = props.get("AREA_SHORT_CODE") or props.get(
"area_short_code"
)
records.append(
NeighbourhoodRecord(
area_id=int(area_id),
area_name=str(area_name),
area_short_code=area_short_code,
geometry=geometry,
)
)
logger.info(f"Parsed {len(records)} neighbourhoods")
return records
def get_census_profiles(self, year: int = 2021) -> list[CensusRecord]:
"""Fetch neighbourhood census profiles.
Note: Census profile data structure varies by year. This method
extracts key demographic indicators where available.
Args:
year: Census year (2016 or 2021).
Returns:
List of validated CensusRecord objects.
"""
# Census profiles are typically in CSV/datastore format
try:
raw_records = self._fetch_csv_as_json(
self.DATASETS["neighbourhood_profiles"]
)
except ValueError as e:
logger.warning(f"Could not fetch census profiles: {e}")
return []
# Census profiles are pivoted - rows are indicators, columns are neighbourhoods
# This requires special handling based on the actual data structure
logger.info(f"Fetched {len(raw_records)} census profile rows")
# For now, return empty list - actual implementation depends on data structure
# TODO: Implement census profile parsing based on actual data format
return []
def get_parks(self) -> list[AmenityRecord]:
"""Fetch park locations.
Returns:
List of validated AmenityRecord objects.
"""
return self._fetch_amenities(
self.DATASETS["parks"],
AmenityType.PARK,
name_field="ASSET_NAME",
address_field="ADDRESS_FULL",
)
def get_schools(self) -> list[AmenityRecord]:
"""Fetch school locations.
Returns:
List of validated AmenityRecord objects.
"""
return self._fetch_amenities(
self.DATASETS["schools"],
AmenityType.SCHOOL,
name_field="NAME",
address_field="ADDRESS_FULL",
)
def get_childcare_centres(self) -> list[AmenityRecord]:
"""Fetch licensed childcare centre locations.
Returns:
List of validated AmenityRecord objects.
"""
return self._fetch_amenities(
self.DATASETS["childcare"],
AmenityType.CHILDCARE,
name_field="LOC_NAME",
address_field="ADDRESS",
)
def _fetch_amenities(
self,
package_id: str,
amenity_type: AmenityType,
name_field: str,
address_field: str,
) -> list[AmenityRecord]:
"""Fetch and parse amenity data from GeoJSON.
Args:
package_id: CKAN package ID.
amenity_type: Type of amenity.
name_field: Property name containing amenity name.
address_field: Property name containing address.
Returns:
List of AmenityRecord objects.
"""
try:
geojson = self._fetch_geojson(package_id)
except (httpx.HTTPError, ValueError) as e:
logger.warning(f"Could not fetch {package_id}: {e}")
return []
features = geojson.get("features", [])
records = []
for feature in features:
props = feature.get("properties", {})
geometry = feature.get("geometry")
# Get coordinates from geometry
lat, lon = None, None
if geometry and geometry.get("type") == "Point":
coords = geometry.get("coordinates", [])
if len(coords) >= 2:
lon, lat = coords[0], coords[1]
# Try to determine neighbourhood_id
# Many datasets include AREA_ID or similar
neighbourhood_id = (
props.get("AREA_ID")
or props.get("area_id")
or props.get("NEIGHBOURHOOD_ID")
or 0 # Will need spatial join if not available
)
name = props.get(name_field) or props.get(name_field.lower()) or "Unknown"
address = props.get(address_field) or props.get(address_field.lower())
# Skip if we don't have a neighbourhood assignment
if neighbourhood_id == 0:
continue
records.append(
AmenityRecord(
neighbourhood_id=int(neighbourhood_id),
amenity_type=amenity_type,
amenity_name=str(name)[:200],
address=str(address)[:300] if address else None,
latitude=Decimal(str(lat)) if lat else None,
longitude=Decimal(str(lon)) if lon else None,
)
)
logger.info(f"Parsed {len(records)} {amenity_type.value} records")
return records

View File

@@ -0,0 +1,371 @@
"""Parser for Toronto Police crime data via CKAN API.
Fetches neighbourhood crime rates and major crime indicators from the
Toronto Police Service data hosted on Toronto Open Data Portal.
Data Sources:
- Neighbourhood Crime Rates: Annual crime rates by neighbourhood
- Major Crime Indicators (MCI): Detailed incident-level data
"""
import contextlib
import logging
from decimal import Decimal
from typing import Any
import httpx
from portfolio_app.toronto.schemas import CrimeRecord, CrimeType
logger = logging.getLogger(__name__)
# Mapping from Toronto Police crime categories to CrimeType enum
CRIME_TYPE_MAPPING: dict[str, CrimeType] = {
"assault": CrimeType.ASSAULT,
"assaults": CrimeType.ASSAULT,
"auto theft": CrimeType.AUTO_THEFT,
"autotheft": CrimeType.AUTO_THEFT,
"auto_theft": CrimeType.AUTO_THEFT,
"break and enter": CrimeType.BREAK_AND_ENTER,
"breakenter": CrimeType.BREAK_AND_ENTER,
"break_and_enter": CrimeType.BREAK_AND_ENTER,
"homicide": CrimeType.HOMICIDE,
"homicides": CrimeType.HOMICIDE,
"robbery": CrimeType.ROBBERY,
"robberies": CrimeType.ROBBERY,
"shooting": CrimeType.SHOOTING,
"shootings": CrimeType.SHOOTING,
"theft over": CrimeType.THEFT_OVER,
"theftover": CrimeType.THEFT_OVER,
"theft_over": CrimeType.THEFT_OVER,
"theft from motor vehicle": CrimeType.THEFT_FROM_MOTOR_VEHICLE,
"theftfrommv": CrimeType.THEFT_FROM_MOTOR_VEHICLE,
"theft_from_mv": CrimeType.THEFT_FROM_MOTOR_VEHICLE,
}
def _normalize_crime_type(crime_str: str) -> CrimeType:
"""Normalize crime type string to CrimeType enum.
Args:
crime_str: Raw crime type string from data source.
Returns:
Matched CrimeType enum value, or CrimeType.OTHER if no match.
"""
normalized = crime_str.lower().strip().replace("-", " ").replace("_", " ")
return CRIME_TYPE_MAPPING.get(normalized, CrimeType.OTHER)
class TorontoPoliceParser:
"""Parser for Toronto Police crime data via CKAN API.
Crime data is hosted on Toronto Open Data Portal but sourced from
Toronto Police Service.
"""
BASE_URL = "https://ckan0.cf.opendata.inter.prod-toronto.ca"
API_PATH = "/api/3/action"
# Dataset package IDs
DATASETS = {
"crime_rates": "neighbourhood-crime-rates",
"mci": "major-crime-indicators",
"shootings": "shootings-firearm-discharges",
}
def __init__(self, timeout: float = 30.0) -> None:
"""Initialize parser.
Args:
timeout: HTTP request timeout in seconds.
"""
self._timeout = timeout
self._client: httpx.Client | None = None
@property
def client(self) -> httpx.Client:
"""Lazy-initialize HTTP client."""
if self._client is None:
self._client = httpx.Client(
base_url=self.BASE_URL,
timeout=self._timeout,
headers={"Accept": "application/json"},
)
return self._client
def close(self) -> None:
"""Close HTTP client."""
if self._client is not None:
self._client.close()
self._client = None
def __enter__(self) -> "TorontoPoliceParser":
return self
def __exit__(self, *args: Any) -> None:
self.close()
def _get_package(self, package_id: str) -> dict[str, Any]:
"""Fetch package metadata from CKAN."""
response = self.client.get(
f"{self.API_PATH}/package_show",
params={"id": package_id},
)
response.raise_for_status()
result = response.json()
if not result.get("success"):
raise ValueError(f"CKAN API error: {result.get('error', 'Unknown error')}")
return dict(result["result"])
def _fetch_datastore_records(
self,
package_id: str,
filters: dict[str, Any] | None = None,
) -> list[dict[str, Any]]:
"""Fetch records from CKAN datastore.
Args:
package_id: CKAN package ID.
filters: Optional filters to apply.
Returns:
List of records as dictionaries.
"""
package = self._get_package(package_id)
resources = package.get("resources", [])
# Find datastore-enabled resource
resource_id = None
for resource in resources:
if resource.get("datastore_active"):
resource_id = resource["id"]
break
if not resource_id:
raise ValueError(f"No datastore resource in {package_id}")
# Fetch all records
records: list[dict[str, Any]] = []
offset = 0
limit = 1000
while True:
params: dict[str, Any] = {
"id": resource_id,
"limit": limit,
"offset": offset,
}
if filters:
params["filters"] = str(filters)
response = self.client.get(
f"{self.API_PATH}/datastore_search",
params=params,
)
response.raise_for_status()
result = response.json()
if not result.get("success"):
raise ValueError(f"Datastore error: {result.get('error')}")
batch = result["result"]["records"]
records.extend(batch)
if len(batch) < limit:
break
offset += limit
return records
def get_crime_rates(
self,
years: list[int] | None = None,
) -> list[CrimeRecord]:
"""Fetch neighbourhood crime rates.
The crime rates dataset contains annual counts and rates per 100k
population for each neighbourhood.
Args:
years: Optional list of years to filter. If None, fetches all.
Returns:
List of validated CrimeRecord objects.
"""
try:
raw_records = self._fetch_datastore_records(self.DATASETS["crime_rates"])
except (httpx.HTTPError, ValueError) as e:
logger.warning(f"Could not fetch crime rates: {e}")
return []
records = []
for row in raw_records:
# Extract neighbourhood ID (Hood_ID maps to AREA_ID)
hood_id = row.get("HOOD_ID") or row.get("Hood_ID") or row.get("hood_id")
if not hood_id:
continue
try:
neighbourhood_id = int(hood_id)
except (ValueError, TypeError):
continue
# Crime rate data typically has columns like:
# ASSAULT_2019, ASSAULT_RATE_2019, AUTOTHEFT_2020, etc.
# We need to parse column names to extract crime type and year
for col_name, value in row.items():
if value is None or col_name in (
"_id",
"HOOD_ID",
"Hood_ID",
"hood_id",
"AREA_NAME",
"NEIGHBOURHOOD",
):
continue
# Try to parse column name for crime type and year
# Pattern: CRIMETYPE_YEAR or CRIMETYPE_RATE_YEAR
parts = col_name.upper().split("_")
if len(parts) < 2:
continue
# Check if last part is a year
try:
year = int(parts[-1])
if year < 2014 or year > 2030:
continue
except ValueError:
continue
# Filter by years if specified
if years and year not in years:
continue
# Check if this is a rate column
is_rate = "RATE" in parts
# Extract crime type (everything before RATE/year)
if is_rate:
rate_idx = parts.index("RATE")
crime_type_str = "_".join(parts[:rate_idx])
else:
crime_type_str = "_".join(parts[:-1])
crime_type = _normalize_crime_type(crime_type_str)
try:
numeric_value = Decimal(str(value))
except (ValueError, TypeError):
continue
if is_rate:
# This is a rate column - look for corresponding count
# We'll skip rate-only entries and create records from counts
continue
# Find corresponding rate if available
rate_col = f"{crime_type_str}_RATE_{year}"
rate_value = row.get(rate_col)
rate_per_100k = None
if rate_value is not None:
with contextlib.suppress(ValueError, TypeError):
rate_per_100k = Decimal(str(rate_value))
records.append(
CrimeRecord(
neighbourhood_id=neighbourhood_id,
year=year,
crime_type=crime_type,
count=int(numeric_value),
rate_per_100k=rate_per_100k,
)
)
logger.info(f"Parsed {len(records)} crime rate records")
return records
def get_major_crime_indicators(
self,
years: list[int] | None = None,
) -> list[CrimeRecord]:
"""Fetch major crime indicators (detailed MCI data).
MCI data contains incident-level records that need to be aggregated
by neighbourhood and year.
Args:
years: Optional list of years to filter.
Returns:
List of aggregated CrimeRecord objects.
"""
try:
raw_records = self._fetch_datastore_records(self.DATASETS["mci"])
except (httpx.HTTPError, ValueError) as e:
logger.warning(f"Could not fetch MCI data: {e}")
return []
# Aggregate counts by neighbourhood, year, and crime type
aggregates: dict[tuple[int, int, CrimeType], int] = {}
for row in raw_records:
# Extract neighbourhood ID
hood_id = (
row.get("HOOD_158")
or row.get("HOOD_140")
or row.get("HOOD_ID")
or row.get("Hood_ID")
)
if not hood_id:
continue
try:
neighbourhood_id = int(hood_id)
except (ValueError, TypeError):
continue
# Extract year from occurrence date
occ_year = row.get("OCC_YEAR") or row.get("REPORT_YEAR")
if not occ_year:
continue
try:
year = int(occ_year)
if year < 2014 or year > 2030:
continue
except (ValueError, TypeError):
continue
# Filter by years if specified
if years and year not in years:
continue
# Extract crime type
mci_category = row.get("MCI_CATEGORY") or row.get("OFFENCE") or ""
crime_type = _normalize_crime_type(str(mci_category))
# Aggregate count
key = (neighbourhood_id, year, crime_type)
aggregates[key] = aggregates.get(key, 0) + 1
# Convert aggregates to CrimeRecord objects
records = [
CrimeRecord(
neighbourhood_id=neighbourhood_id,
year=year,
crime_type=crime_type,
count=count,
rate_per_100k=None, # Would need population data to calculate
)
for (neighbourhood_id, year, crime_type), count in aggregates.items()
]
logger.info(f"Parsed {len(records)} MCI records (aggregated)")
return records