From 053acf64362591e7121d0811088f02550e6e71a4 Mon Sep 17 00:00:00 2001 From: lmiranda Date: Fri, 16 Jan 2026 11:07:13 -0500 Subject: [PATCH] feat: Implement Phase 3 neighbourhood data model Add schemas, parsers, loaders, and models for Toronto neighbourhood-centric data including census profiles, crime statistics, and amenities. Schemas: - NeighbourhoodRecord, CensusRecord, CrimeRecord, CrimeType - AmenityType, AmenityRecord, AmenityCount Models: - BridgeCMHCNeighbourhood (zone-to-neighbourhood mapping with weights) - FactCensus, FactCrime, FactAmenities Parsers: - TorontoOpenDataParser (CKAN API for neighbourhoods, census, amenities) - TorontoPoliceParser (crime rates, MCI data) Loaders: - load_census_data, load_crime_data, load_amenities - build_cmhc_neighbourhood_crosswalk (PostGIS area weights) Also updates CLAUDE.md with projman plugin workflow documentation. Closes #53, #54, #55, #56, #57, #58, #59 Co-Authored-By: Claude Opus 4.5 --- CLAUDE.md | 49 +++ portfolio_app/toronto/loaders/__init__.py | 17 + portfolio_app/toronto/loaders/amenities.py | 93 +++++ portfolio_app/toronto/loaders/census.py | 68 +++ .../toronto/loaders/cmhc_crosswalk.py | 131 ++++++ portfolio_app/toronto/loaders/crime.py | 45 ++ portfolio_app/toronto/models/__init__.py | 13 +- portfolio_app/toronto/models/facts.py | 108 ++++- portfolio_app/toronto/parsers/__init__.py | 5 + .../toronto/parsers/toronto_open_data.py | 391 ++++++++++++++++++ .../toronto/parsers/toronto_police.py | 371 +++++++++++++++++ portfolio_app/toronto/schemas/__init__.py | 11 + portfolio_app/toronto/schemas/amenities.py | 60 +++ .../toronto/schemas/neighbourhood.py | 106 +++++ 14 files changed, 1466 insertions(+), 2 deletions(-) create mode 100644 portfolio_app/toronto/loaders/amenities.py create mode 100644 portfolio_app/toronto/loaders/census.py create mode 100644 portfolio_app/toronto/loaders/cmhc_crosswalk.py create mode 100644 portfolio_app/toronto/loaders/crime.py create mode 100644 portfolio_app/toronto/parsers/toronto_open_data.py create mode 100644 portfolio_app/toronto/parsers/toronto_police.py create mode 100644 portfolio_app/toronto/schemas/amenities.py create mode 100644 portfolio_app/toronto/schemas/neighbourhood.py diff --git a/CLAUDE.md b/CLAUDE.md index 1a8f644..7814622 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -261,4 +261,53 @@ All scripts in `scripts/`: --- +## Projman Plugin Workflow + +**CRITICAL: Always use the projman plugin for sprint and task management.** + +### When to Use Projman Skills + +| Skill | Trigger | Purpose | +|-------|---------|---------| +| `/projman:sprint-plan` | New sprint or phase implementation | Architecture analysis + Gitea issue creation | +| `/projman:sprint-start` | Beginning implementation work | Load lessons learned, start execution | +| `/projman:sprint-status` | Check progress | Review blockers and completion status | +| `/projman:sprint-close` | Sprint completion | Capture lessons learned to Wiki.js | + +### Default Behavior + +When user requests implementation work: + +1. **ALWAYS start with `/projman:sprint-plan`** before writing code +2. Create Gitea issues with proper labels and acceptance criteria +3. Use `/projman:sprint-start` to begin execution with lessons learned +4. Track progress via Gitea issue comments +5. Close sprint with `/projman:sprint-close` to document lessons + +### Gitea Repository + +- **Repo**: `lmiranda/personal-portfolio` +- **Host**: `gitea.hotserv.cloud` +- **Note**: `lmiranda` is a user account (not org), so label lookup may require repo-level labels + +### MCP Tools Available + +**Gitea**: +- `list_issues`, `get_issue`, `create_issue`, `update_issue`, `add_comment` +- `get_labels`, `suggest_labels` + +**Wiki.js**: +- `search_lessons`, `create_lesson`, `search_pages`, `get_page` + +### Issue Structure + +Every Gitea issue should include: +- **Overview**: Brief description +- **Files to Create/Modify**: Explicit paths +- **Acceptance Criteria**: Checkboxes +- **Technical Notes**: Implementation hints +- **Labels**: Listed in body (workaround for label API issues) + +--- + *Last Updated: Sprint 9* diff --git a/portfolio_app/toronto/loaders/__init__.py b/portfolio_app/toronto/loaders/__init__.py index 3574070..b977d1d 100644 --- a/portfolio_app/toronto/loaders/__init__.py +++ b/portfolio_app/toronto/loaders/__init__.py @@ -1,7 +1,15 @@ """Database loaders for Toronto housing data.""" +from .amenities import load_amenities, load_amenity_counts from .base import bulk_insert, get_session, upsert_by_key +from .census import load_census_data from .cmhc import load_cmhc_record, load_cmhc_rentals +from .cmhc_crosswalk import ( + build_cmhc_neighbourhood_crosswalk, + disaggregate_zone_value, + get_neighbourhood_weights_for_zone, +) +from .crime import load_crime_data from .dimensions import ( generate_date_key, load_cmhc_zones, @@ -24,4 +32,13 @@ __all__ = [ # Fact loaders "load_cmhc_rentals", "load_cmhc_record", + # Phase 3 loaders + "load_census_data", + "load_crime_data", + "load_amenities", + "load_amenity_counts", + # CMHC crosswalk + "build_cmhc_neighbourhood_crosswalk", + "get_neighbourhood_weights_for_zone", + "disaggregate_zone_value", ] diff --git a/portfolio_app/toronto/loaders/amenities.py b/portfolio_app/toronto/loaders/amenities.py new file mode 100644 index 0000000..db3e2b3 --- /dev/null +++ b/portfolio_app/toronto/loaders/amenities.py @@ -0,0 +1,93 @@ +"""Loader for amenities data to fact_amenities table.""" + +from collections import Counter + +from sqlalchemy.orm import Session + +from portfolio_app.toronto.models import FactAmenities +from portfolio_app.toronto.schemas import AmenityCount, AmenityRecord + +from .base import get_session, upsert_by_key + + +def load_amenities( + records: list[AmenityRecord], + year: int, + session: Session | None = None, +) -> int: + """Load amenity records to fact_amenities table. + + Aggregates individual amenity records into counts by neighbourhood + and amenity type before loading. + + Args: + records: List of validated AmenityRecord schemas. + year: Year to associate with the amenity counts. + session: Optional existing session. + + Returns: + Number of records loaded (inserted + updated). + """ + # Aggregate records by neighbourhood and amenity type + counts: Counter[tuple[int, str]] = Counter() + for r in records: + key = (r.neighbourhood_id, r.amenity_type.value) + counts[key] += 1 + + # Convert to AmenityCount schemas then to models + def _load(sess: Session) -> int: + models = [] + for (neighbourhood_id, amenity_type), count in counts.items(): + model = FactAmenities( + neighbourhood_id=neighbourhood_id, + amenity_type=amenity_type, + count=count, + year=year, + ) + models.append(model) + + inserted, updated = upsert_by_key( + sess, FactAmenities, models, ["neighbourhood_id", "amenity_type", "year"] + ) + return inserted + updated + + if session: + return _load(session) + with get_session() as sess: + return _load(sess) + + +def load_amenity_counts( + records: list[AmenityCount], + session: Session | None = None, +) -> int: + """Load pre-aggregated amenity counts to fact_amenities table. + + Args: + records: List of validated AmenityCount schemas. + session: Optional existing session. + + Returns: + Number of records loaded (inserted + updated). + """ + + def _load(sess: Session) -> int: + models = [] + for r in records: + model = FactAmenities( + neighbourhood_id=r.neighbourhood_id, + amenity_type=r.amenity_type.value, + count=r.count, + year=r.year, + ) + models.append(model) + + inserted, updated = upsert_by_key( + sess, FactAmenities, models, ["neighbourhood_id", "amenity_type", "year"] + ) + return inserted + updated + + if session: + return _load(session) + with get_session() as sess: + return _load(sess) diff --git a/portfolio_app/toronto/loaders/census.py b/portfolio_app/toronto/loaders/census.py new file mode 100644 index 0000000..afcb0b4 --- /dev/null +++ b/portfolio_app/toronto/loaders/census.py @@ -0,0 +1,68 @@ +"""Loader for census data to fact_census table.""" + +from sqlalchemy.orm import Session + +from portfolio_app.toronto.models import FactCensus +from portfolio_app.toronto.schemas import CensusRecord + +from .base import get_session, upsert_by_key + + +def load_census_data( + records: list[CensusRecord], + session: Session | None = None, +) -> int: + """Load census records to fact_census table. + + Args: + records: List of validated CensusRecord schemas. + session: Optional existing session. + + Returns: + Number of records loaded (inserted + updated). + """ + + def _load(sess: Session) -> int: + models = [] + for r in records: + model = FactCensus( + neighbourhood_id=r.neighbourhood_id, + census_year=r.census_year, + population=r.population, + population_density=float(r.population_density) + if r.population_density + else None, + median_household_income=float(r.median_household_income) + if r.median_household_income + else None, + average_household_income=float(r.average_household_income) + if r.average_household_income + else None, + unemployment_rate=float(r.unemployment_rate) + if r.unemployment_rate + else None, + pct_bachelors_or_higher=float(r.pct_bachelors_or_higher) + if r.pct_bachelors_or_higher + else None, + pct_owner_occupied=float(r.pct_owner_occupied) + if r.pct_owner_occupied + else None, + pct_renter_occupied=float(r.pct_renter_occupied) + if r.pct_renter_occupied + else None, + median_age=float(r.median_age) if r.median_age else None, + average_dwelling_value=float(r.average_dwelling_value) + if r.average_dwelling_value + else None, + ) + models.append(model) + + inserted, updated = upsert_by_key( + sess, FactCensus, models, ["neighbourhood_id", "census_year"] + ) + return inserted + updated + + if session: + return _load(session) + with get_session() as sess: + return _load(sess) diff --git a/portfolio_app/toronto/loaders/cmhc_crosswalk.py b/portfolio_app/toronto/loaders/cmhc_crosswalk.py new file mode 100644 index 0000000..b856d52 --- /dev/null +++ b/portfolio_app/toronto/loaders/cmhc_crosswalk.py @@ -0,0 +1,131 @@ +"""Loader for CMHC zone to neighbourhood crosswalk with area weights.""" + +from sqlalchemy import text +from sqlalchemy.orm import Session + +from .base import get_session + + +def build_cmhc_neighbourhood_crosswalk( + session: Session | None = None, +) -> int: + """Calculate area overlap weights between CMHC zones and neighbourhoods. + + Uses PostGIS ST_Intersection and ST_Area functions to compute the + proportion of each CMHC zone that overlaps with each neighbourhood. + This enables disaggregation of CMHC zone-level data to neighbourhood level. + + The function is idempotent - it clears existing crosswalk data before + rebuilding. + + Args: + session: Optional existing session. + + Returns: + Number of bridge records created. + + Note: + Requires both dim_cmhc_zone and dim_neighbourhood tables to have + geometry columns populated with valid PostGIS geometries. + """ + + def _build(sess: Session) -> int: + # Clear existing crosswalk data + sess.execute(text("DELETE FROM bridge_cmhc_neighbourhood")) + + # Calculate overlap weights using PostGIS + # Weight = area of intersection / total area of CMHC zone + crosswalk_query = text( + """ + INSERT INTO bridge_cmhc_neighbourhood (cmhc_zone_code, neighbourhood_id, weight) + SELECT + z.zone_code, + n.neighbourhood_id, + CASE + WHEN ST_Area(z.geometry::geography) > 0 THEN + ST_Area(ST_Intersection(z.geometry, n.geometry)::geography) / + ST_Area(z.geometry::geography) + ELSE 0 + END as weight + FROM dim_cmhc_zone z + JOIN dim_neighbourhood n + ON ST_Intersects(z.geometry, n.geometry) + WHERE + z.geometry IS NOT NULL + AND n.geometry IS NOT NULL + AND ST_Area(ST_Intersection(z.geometry, n.geometry)::geography) > 0 + """ + ) + + sess.execute(crosswalk_query) + + # Count records created + count_result = sess.execute( + text("SELECT COUNT(*) FROM bridge_cmhc_neighbourhood") + ) + count = count_result.scalar() or 0 + + return int(count) + + if session: + return _build(session) + with get_session() as sess: + return _build(sess) + + +def get_neighbourhood_weights_for_zone( + zone_code: str, + session: Session | None = None, +) -> list[tuple[int, float]]: + """Get neighbourhood weights for a specific CMHC zone. + + Args: + zone_code: CMHC zone code. + session: Optional existing session. + + Returns: + List of (neighbourhood_id, weight) tuples. + """ + + def _get(sess: Session) -> list[tuple[int, float]]: + result = sess.execute( + text( + """ + SELECT neighbourhood_id, weight + FROM bridge_cmhc_neighbourhood + WHERE cmhc_zone_code = :zone_code + ORDER BY weight DESC + """ + ), + {"zone_code": zone_code}, + ) + return [(int(row[0]), float(row[1])) for row in result] + + if session: + return _get(session) + with get_session() as sess: + return _get(sess) + + +def disaggregate_zone_value( + zone_code: str, + value: float, + session: Session | None = None, +) -> dict[int, float]: + """Disaggregate a CMHC zone value to neighbourhoods using weights. + + Args: + zone_code: CMHC zone code. + value: Value to disaggregate (e.g., average rent). + session: Optional existing session. + + Returns: + Dictionary mapping neighbourhood_id to weighted value. + + Note: + For averages (like rent), the weighted value represents the + contribution from this zone. To get a neighbourhood's total, + sum contributions from all overlapping zones. + """ + weights = get_neighbourhood_weights_for_zone(zone_code, session) + return {neighbourhood_id: value * weight for neighbourhood_id, weight in weights} diff --git a/portfolio_app/toronto/loaders/crime.py b/portfolio_app/toronto/loaders/crime.py new file mode 100644 index 0000000..29c4522 --- /dev/null +++ b/portfolio_app/toronto/loaders/crime.py @@ -0,0 +1,45 @@ +"""Loader for crime data to fact_crime table.""" + +from sqlalchemy.orm import Session + +from portfolio_app.toronto.models import FactCrime +from portfolio_app.toronto.schemas import CrimeRecord + +from .base import get_session, upsert_by_key + + +def load_crime_data( + records: list[CrimeRecord], + session: Session | None = None, +) -> int: + """Load crime records to fact_crime table. + + Args: + records: List of validated CrimeRecord schemas. + session: Optional existing session. + + Returns: + Number of records loaded (inserted + updated). + """ + + def _load(sess: Session) -> int: + models = [] + for r in records: + model = FactCrime( + neighbourhood_id=r.neighbourhood_id, + year=r.year, + crime_type=r.crime_type.value, + count=r.count, + rate_per_100k=float(r.rate_per_100k) if r.rate_per_100k else None, + ) + models.append(model) + + inserted, updated = upsert_by_key( + sess, FactCrime, models, ["neighbourhood_id", "year", "crime_type"] + ) + return inserted + updated + + if session: + return _load(session) + with get_session() as sess: + return _load(sess) diff --git a/portfolio_app/toronto/models/__init__.py b/portfolio_app/toronto/models/__init__.py index 44c5ceb..3fb386c 100644 --- a/portfolio_app/toronto/models/__init__.py +++ b/portfolio_app/toronto/models/__init__.py @@ -7,7 +7,13 @@ from .dimensions import ( DimPolicyEvent, DimTime, ) -from .facts import FactRentals +from .facts import ( + BridgeCMHCNeighbourhood, + FactAmenities, + FactCensus, + FactCrime, + FactRentals, +) __all__ = [ # Base @@ -22,4 +28,9 @@ __all__ = [ "DimPolicyEvent", # Facts "FactRentals", + "FactCensus", + "FactCrime", + "FactAmenities", + # Bridge tables + "BridgeCMHCNeighbourhood", ] diff --git a/portfolio_app/toronto/models/facts.py b/portfolio_app/toronto/models/facts.py index 38e660e..2f1d5bd 100644 --- a/portfolio_app/toronto/models/facts.py +++ b/portfolio_app/toronto/models/facts.py @@ -1,11 +1,117 @@ """SQLAlchemy models for fact tables.""" -from sqlalchemy import ForeignKey, Integer, Numeric, String +from sqlalchemy import ForeignKey, Index, Integer, Numeric, String from sqlalchemy.orm import Mapped, mapped_column, relationship from .base import Base +class BridgeCMHCNeighbourhood(Base): + """Bridge table for CMHC zone to neighbourhood mapping with area weights. + + Enables disaggregation of CMHC zone-level rental data to neighbourhood level + using area-based proportional weights computed via PostGIS. + """ + + __tablename__ = "bridge_cmhc_neighbourhood" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + cmhc_zone_code: Mapped[str] = mapped_column(String(10), nullable=False) + neighbourhood_id: Mapped[int] = mapped_column(Integer, nullable=False) + weight: Mapped[float] = mapped_column( + Numeric(5, 4), nullable=False + ) # 0.0000 to 1.0000 + + __table_args__ = ( + Index("ix_bridge_cmhc_zone", "cmhc_zone_code"), + Index("ix_bridge_neighbourhood", "neighbourhood_id"), + ) + + +class FactCensus(Base): + """Census statistics by neighbourhood and year. + + Grain: One row per neighbourhood per census year. + """ + + __tablename__ = "fact_census" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + neighbourhood_id: Mapped[int] = mapped_column(Integer, nullable=False) + census_year: Mapped[int] = mapped_column(Integer, nullable=False) + population: Mapped[int | None] = mapped_column(Integer, nullable=True) + population_density: Mapped[float | None] = mapped_column( + Numeric(10, 2), nullable=True + ) + median_household_income: Mapped[float | None] = mapped_column( + Numeric(12, 2), nullable=True + ) + average_household_income: Mapped[float | None] = mapped_column( + Numeric(12, 2), nullable=True + ) + unemployment_rate: Mapped[float | None] = mapped_column( + Numeric(5, 2), nullable=True + ) + pct_bachelors_or_higher: Mapped[float | None] = mapped_column( + Numeric(5, 2), nullable=True + ) + pct_owner_occupied: Mapped[float | None] = mapped_column( + Numeric(5, 2), nullable=True + ) + pct_renter_occupied: Mapped[float | None] = mapped_column( + Numeric(5, 2), nullable=True + ) + median_age: Mapped[float | None] = mapped_column(Numeric(5, 2), nullable=True) + average_dwelling_value: Mapped[float | None] = mapped_column( + Numeric(12, 2), nullable=True + ) + + __table_args__ = ( + Index("ix_fact_census_neighbourhood_year", "neighbourhood_id", "census_year"), + ) + + +class FactCrime(Base): + """Crime statistics by neighbourhood and year. + + Grain: One row per neighbourhood per year per crime type. + """ + + __tablename__ = "fact_crime" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + neighbourhood_id: Mapped[int] = mapped_column(Integer, nullable=False) + year: Mapped[int] = mapped_column(Integer, nullable=False) + crime_type: Mapped[str] = mapped_column(String(50), nullable=False) + count: Mapped[int] = mapped_column(Integer, nullable=False) + rate_per_100k: Mapped[float | None] = mapped_column(Numeric(10, 2), nullable=True) + + __table_args__ = ( + Index("ix_fact_crime_neighbourhood_year", "neighbourhood_id", "year"), + Index("ix_fact_crime_type", "crime_type"), + ) + + +class FactAmenities(Base): + """Amenity counts by neighbourhood. + + Grain: One row per neighbourhood per amenity type per year. + """ + + __tablename__ = "fact_amenities" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + neighbourhood_id: Mapped[int] = mapped_column(Integer, nullable=False) + amenity_type: Mapped[str] = mapped_column(String(50), nullable=False) + count: Mapped[int] = mapped_column(Integer, nullable=False) + year: Mapped[int] = mapped_column(Integer, nullable=False) + + __table_args__ = ( + Index("ix_fact_amenities_neighbourhood_year", "neighbourhood_id", "year"), + Index("ix_fact_amenities_type", "amenity_type"), + ) + + class FactRentals(Base): """Fact table for CMHC rental market data. diff --git a/portfolio_app/toronto/parsers/__init__.py b/portfolio_app/toronto/parsers/__init__.py index 02ea39d..50b4b39 100644 --- a/portfolio_app/toronto/parsers/__init__.py +++ b/portfolio_app/toronto/parsers/__init__.py @@ -6,6 +6,8 @@ from .geo import ( NeighbourhoodParser, load_geojson, ) +from .toronto_open_data import TorontoOpenDataParser +from .toronto_police import TorontoPoliceParser __all__ = [ "CMHCParser", @@ -13,4 +15,7 @@ __all__ = [ "CMHCZoneParser", "NeighbourhoodParser", "load_geojson", + # API parsers (Phase 3) + "TorontoOpenDataParser", + "TorontoPoliceParser", ] diff --git a/portfolio_app/toronto/parsers/toronto_open_data.py b/portfolio_app/toronto/parsers/toronto_open_data.py new file mode 100644 index 0000000..bbc58af --- /dev/null +++ b/portfolio_app/toronto/parsers/toronto_open_data.py @@ -0,0 +1,391 @@ +"""Parser for Toronto Open Data CKAN API. + +Fetches neighbourhood boundaries, census profiles, and amenities data +from the City of Toronto's Open Data Portal. + +API Documentation: https://open.toronto.ca/dataset/ +""" + +import json +import logging +from decimal import Decimal +from pathlib import Path +from typing import Any + +import httpx + +from portfolio_app.toronto.schemas import ( + AmenityRecord, + AmenityType, + CensusRecord, + NeighbourhoodRecord, +) + +logger = logging.getLogger(__name__) + + +class TorontoOpenDataParser: + """Parser for Toronto Open Data CKAN API. + + Provides methods to fetch and parse neighbourhood boundaries, census profiles, + and amenities (parks, schools, childcare) from the Toronto Open Data portal. + """ + + BASE_URL = "https://ckan0.cf.opendata.inter.prod-toronto.ca" + API_PATH = "/api/3/action" + + # Dataset package IDs + DATASETS = { + "neighbourhoods": "neighbourhoods", + "neighbourhood_profiles": "neighbourhood-profiles", + "parks": "parks", + "schools": "school-locations-all-types", + "childcare": "licensed-child-care-centres", + } + + def __init__( + self, + cache_dir: Path | None = None, + timeout: float = 30.0, + ) -> None: + """Initialize parser. + + Args: + cache_dir: Optional directory for caching API responses. + timeout: HTTP request timeout in seconds. + """ + self._cache_dir = cache_dir + self._timeout = timeout + self._client: httpx.Client | None = None + + @property + def client(self) -> httpx.Client: + """Lazy-initialize HTTP client.""" + if self._client is None: + self._client = httpx.Client( + base_url=self.BASE_URL, + timeout=self._timeout, + headers={"Accept": "application/json"}, + ) + return self._client + + def close(self) -> None: + """Close HTTP client.""" + if self._client is not None: + self._client.close() + self._client = None + + def __enter__(self) -> "TorontoOpenDataParser": + return self + + def __exit__(self, *args: Any) -> None: + self.close() + + def _get_package(self, package_id: str) -> dict[str, Any]: + """Fetch package metadata from CKAN. + + Args: + package_id: The package/dataset ID. + + Returns: + Package metadata dictionary. + """ + response = self.client.get( + f"{self.API_PATH}/package_show", + params={"id": package_id}, + ) + response.raise_for_status() + result = response.json() + + if not result.get("success"): + raise ValueError(f"CKAN API error: {result.get('error', 'Unknown error')}") + + return dict(result["result"]) + + def _get_resource_url( + self, + package_id: str, + format_filter: str = "geojson", + ) -> str: + """Get the download URL for a resource in a package. + + Args: + package_id: The package/dataset ID. + format_filter: Resource format to filter by (e.g., 'geojson', 'csv'). + + Returns: + Resource download URL. + + Raises: + ValueError: If no matching resource is found. + """ + package = self._get_package(package_id) + resources = package.get("resources", []) + + for resource in resources: + resource_format = resource.get("format", "").lower() + if format_filter.lower() in resource_format: + return str(resource["url"]) + + available = [r.get("format") for r in resources] + raise ValueError( + f"No {format_filter} resource in {package_id}. Available: {available}" + ) + + def _fetch_geojson(self, package_id: str) -> dict[str, Any]: + """Fetch GeoJSON data from a package. + + Args: + package_id: The package/dataset ID. + + Returns: + GeoJSON FeatureCollection. + """ + # Check cache first + if self._cache_dir: + cache_file = self._cache_dir / f"{package_id}.geojson" + if cache_file.exists(): + logger.debug(f"Loading {package_id} from cache") + with open(cache_file, encoding="utf-8") as f: + return dict(json.load(f)) + + url = self._get_resource_url(package_id, format_filter="geojson") + logger.info(f"Fetching GeoJSON from {url}") + + response = self.client.get(url) + response.raise_for_status() + data = response.json() + + # Cache the response + if self._cache_dir: + self._cache_dir.mkdir(parents=True, exist_ok=True) + cache_file = self._cache_dir / f"{package_id}.geojson" + with open(cache_file, "w", encoding="utf-8") as f: + json.dump(data, f) + + return dict(data) + + def _fetch_csv_as_json(self, package_id: str) -> list[dict[str, Any]]: + """Fetch CSV data as JSON records via CKAN datastore. + + Args: + package_id: The package/dataset ID. + + Returns: + List of records as dictionaries. + """ + package = self._get_package(package_id) + resources = package.get("resources", []) + + # Find a datastore-enabled resource + for resource in resources: + if resource.get("datastore_active"): + resource_id = resource["id"] + break + else: + raise ValueError(f"No datastore resource in {package_id}") + + # Fetch all records via datastore_search + records: list[dict[str, Any]] = [] + offset = 0 + limit = 1000 + + while True: + response = self.client.get( + f"{self.API_PATH}/datastore_search", + params={"id": resource_id, "limit": limit, "offset": offset}, + ) + response.raise_for_status() + result = response.json() + + if not result.get("success"): + raise ValueError(f"Datastore error: {result.get('error')}") + + batch = result["result"]["records"] + records.extend(batch) + + if len(batch) < limit: + break + offset += limit + + return records + + def get_neighbourhoods(self) -> list[NeighbourhoodRecord]: + """Fetch 158 Toronto neighbourhood boundaries. + + Returns: + List of validated NeighbourhoodRecord objects. + """ + geojson = self._fetch_geojson(self.DATASETS["neighbourhoods"]) + features = geojson.get("features", []) + + records = [] + for feature in features: + props = feature.get("properties", {}) + geometry = feature.get("geometry") + + # Extract area_id from various possible property names + area_id = props.get("AREA_ID") or props.get("area_id") + if area_id is None: + # Try AREA_SHORT_CODE as fallback + short_code = props.get("AREA_SHORT_CODE", "") + if short_code: + # Extract numeric part + area_id = int("".join(c for c in short_code if c.isdigit()) or "0") + + area_name = ( + props.get("AREA_NAME") + or props.get("area_name") + or f"Neighbourhood {area_id}" + ) + area_short_code = props.get("AREA_SHORT_CODE") or props.get( + "area_short_code" + ) + + records.append( + NeighbourhoodRecord( + area_id=int(area_id), + area_name=str(area_name), + area_short_code=area_short_code, + geometry=geometry, + ) + ) + + logger.info(f"Parsed {len(records)} neighbourhoods") + return records + + def get_census_profiles(self, year: int = 2021) -> list[CensusRecord]: + """Fetch neighbourhood census profiles. + + Note: Census profile data structure varies by year. This method + extracts key demographic indicators where available. + + Args: + year: Census year (2016 or 2021). + + Returns: + List of validated CensusRecord objects. + """ + # Census profiles are typically in CSV/datastore format + try: + raw_records = self._fetch_csv_as_json( + self.DATASETS["neighbourhood_profiles"] + ) + except ValueError as e: + logger.warning(f"Could not fetch census profiles: {e}") + return [] + + # Census profiles are pivoted - rows are indicators, columns are neighbourhoods + # This requires special handling based on the actual data structure + logger.info(f"Fetched {len(raw_records)} census profile rows") + + # For now, return empty list - actual implementation depends on data structure + # TODO: Implement census profile parsing based on actual data format + return [] + + def get_parks(self) -> list[AmenityRecord]: + """Fetch park locations. + + Returns: + List of validated AmenityRecord objects. + """ + return self._fetch_amenities( + self.DATASETS["parks"], + AmenityType.PARK, + name_field="ASSET_NAME", + address_field="ADDRESS_FULL", + ) + + def get_schools(self) -> list[AmenityRecord]: + """Fetch school locations. + + Returns: + List of validated AmenityRecord objects. + """ + return self._fetch_amenities( + self.DATASETS["schools"], + AmenityType.SCHOOL, + name_field="NAME", + address_field="ADDRESS_FULL", + ) + + def get_childcare_centres(self) -> list[AmenityRecord]: + """Fetch licensed childcare centre locations. + + Returns: + List of validated AmenityRecord objects. + """ + return self._fetch_amenities( + self.DATASETS["childcare"], + AmenityType.CHILDCARE, + name_field="LOC_NAME", + address_field="ADDRESS", + ) + + def _fetch_amenities( + self, + package_id: str, + amenity_type: AmenityType, + name_field: str, + address_field: str, + ) -> list[AmenityRecord]: + """Fetch and parse amenity data from GeoJSON. + + Args: + package_id: CKAN package ID. + amenity_type: Type of amenity. + name_field: Property name containing amenity name. + address_field: Property name containing address. + + Returns: + List of AmenityRecord objects. + """ + try: + geojson = self._fetch_geojson(package_id) + except (httpx.HTTPError, ValueError) as e: + logger.warning(f"Could not fetch {package_id}: {e}") + return [] + + features = geojson.get("features", []) + records = [] + + for feature in features: + props = feature.get("properties", {}) + geometry = feature.get("geometry") + + # Get coordinates from geometry + lat, lon = None, None + if geometry and geometry.get("type") == "Point": + coords = geometry.get("coordinates", []) + if len(coords) >= 2: + lon, lat = coords[0], coords[1] + + # Try to determine neighbourhood_id + # Many datasets include AREA_ID or similar + neighbourhood_id = ( + props.get("AREA_ID") + or props.get("area_id") + or props.get("NEIGHBOURHOOD_ID") + or 0 # Will need spatial join if not available + ) + + name = props.get(name_field) or props.get(name_field.lower()) or "Unknown" + address = props.get(address_field) or props.get(address_field.lower()) + + # Skip if we don't have a neighbourhood assignment + if neighbourhood_id == 0: + continue + + records.append( + AmenityRecord( + neighbourhood_id=int(neighbourhood_id), + amenity_type=amenity_type, + amenity_name=str(name)[:200], + address=str(address)[:300] if address else None, + latitude=Decimal(str(lat)) if lat else None, + longitude=Decimal(str(lon)) if lon else None, + ) + ) + + logger.info(f"Parsed {len(records)} {amenity_type.value} records") + return records diff --git a/portfolio_app/toronto/parsers/toronto_police.py b/portfolio_app/toronto/parsers/toronto_police.py new file mode 100644 index 0000000..2a5661b --- /dev/null +++ b/portfolio_app/toronto/parsers/toronto_police.py @@ -0,0 +1,371 @@ +"""Parser for Toronto Police crime data via CKAN API. + +Fetches neighbourhood crime rates and major crime indicators from the +Toronto Police Service data hosted on Toronto Open Data Portal. + +Data Sources: +- Neighbourhood Crime Rates: Annual crime rates by neighbourhood +- Major Crime Indicators (MCI): Detailed incident-level data +""" + +import contextlib +import logging +from decimal import Decimal +from typing import Any + +import httpx + +from portfolio_app.toronto.schemas import CrimeRecord, CrimeType + +logger = logging.getLogger(__name__) + + +# Mapping from Toronto Police crime categories to CrimeType enum +CRIME_TYPE_MAPPING: dict[str, CrimeType] = { + "assault": CrimeType.ASSAULT, + "assaults": CrimeType.ASSAULT, + "auto theft": CrimeType.AUTO_THEFT, + "autotheft": CrimeType.AUTO_THEFT, + "auto_theft": CrimeType.AUTO_THEFT, + "break and enter": CrimeType.BREAK_AND_ENTER, + "breakenter": CrimeType.BREAK_AND_ENTER, + "break_and_enter": CrimeType.BREAK_AND_ENTER, + "homicide": CrimeType.HOMICIDE, + "homicides": CrimeType.HOMICIDE, + "robbery": CrimeType.ROBBERY, + "robberies": CrimeType.ROBBERY, + "shooting": CrimeType.SHOOTING, + "shootings": CrimeType.SHOOTING, + "theft over": CrimeType.THEFT_OVER, + "theftover": CrimeType.THEFT_OVER, + "theft_over": CrimeType.THEFT_OVER, + "theft from motor vehicle": CrimeType.THEFT_FROM_MOTOR_VEHICLE, + "theftfrommv": CrimeType.THEFT_FROM_MOTOR_VEHICLE, + "theft_from_mv": CrimeType.THEFT_FROM_MOTOR_VEHICLE, +} + + +def _normalize_crime_type(crime_str: str) -> CrimeType: + """Normalize crime type string to CrimeType enum. + + Args: + crime_str: Raw crime type string from data source. + + Returns: + Matched CrimeType enum value, or CrimeType.OTHER if no match. + """ + normalized = crime_str.lower().strip().replace("-", " ").replace("_", " ") + return CRIME_TYPE_MAPPING.get(normalized, CrimeType.OTHER) + + +class TorontoPoliceParser: + """Parser for Toronto Police crime data via CKAN API. + + Crime data is hosted on Toronto Open Data Portal but sourced from + Toronto Police Service. + """ + + BASE_URL = "https://ckan0.cf.opendata.inter.prod-toronto.ca" + API_PATH = "/api/3/action" + + # Dataset package IDs + DATASETS = { + "crime_rates": "neighbourhood-crime-rates", + "mci": "major-crime-indicators", + "shootings": "shootings-firearm-discharges", + } + + def __init__(self, timeout: float = 30.0) -> None: + """Initialize parser. + + Args: + timeout: HTTP request timeout in seconds. + """ + self._timeout = timeout + self._client: httpx.Client | None = None + + @property + def client(self) -> httpx.Client: + """Lazy-initialize HTTP client.""" + if self._client is None: + self._client = httpx.Client( + base_url=self.BASE_URL, + timeout=self._timeout, + headers={"Accept": "application/json"}, + ) + return self._client + + def close(self) -> None: + """Close HTTP client.""" + if self._client is not None: + self._client.close() + self._client = None + + def __enter__(self) -> "TorontoPoliceParser": + return self + + def __exit__(self, *args: Any) -> None: + self.close() + + def _get_package(self, package_id: str) -> dict[str, Any]: + """Fetch package metadata from CKAN.""" + response = self.client.get( + f"{self.API_PATH}/package_show", + params={"id": package_id}, + ) + response.raise_for_status() + result = response.json() + + if not result.get("success"): + raise ValueError(f"CKAN API error: {result.get('error', 'Unknown error')}") + + return dict(result["result"]) + + def _fetch_datastore_records( + self, + package_id: str, + filters: dict[str, Any] | None = None, + ) -> list[dict[str, Any]]: + """Fetch records from CKAN datastore. + + Args: + package_id: CKAN package ID. + filters: Optional filters to apply. + + Returns: + List of records as dictionaries. + """ + package = self._get_package(package_id) + resources = package.get("resources", []) + + # Find datastore-enabled resource + resource_id = None + for resource in resources: + if resource.get("datastore_active"): + resource_id = resource["id"] + break + + if not resource_id: + raise ValueError(f"No datastore resource in {package_id}") + + # Fetch all records + records: list[dict[str, Any]] = [] + offset = 0 + limit = 1000 + + while True: + params: dict[str, Any] = { + "id": resource_id, + "limit": limit, + "offset": offset, + } + if filters: + params["filters"] = str(filters) + + response = self.client.get( + f"{self.API_PATH}/datastore_search", + params=params, + ) + response.raise_for_status() + result = response.json() + + if not result.get("success"): + raise ValueError(f"Datastore error: {result.get('error')}") + + batch = result["result"]["records"] + records.extend(batch) + + if len(batch) < limit: + break + offset += limit + + return records + + def get_crime_rates( + self, + years: list[int] | None = None, + ) -> list[CrimeRecord]: + """Fetch neighbourhood crime rates. + + The crime rates dataset contains annual counts and rates per 100k + population for each neighbourhood. + + Args: + years: Optional list of years to filter. If None, fetches all. + + Returns: + List of validated CrimeRecord objects. + """ + try: + raw_records = self._fetch_datastore_records(self.DATASETS["crime_rates"]) + except (httpx.HTTPError, ValueError) as e: + logger.warning(f"Could not fetch crime rates: {e}") + return [] + + records = [] + + for row in raw_records: + # Extract neighbourhood ID (Hood_ID maps to AREA_ID) + hood_id = row.get("HOOD_ID") or row.get("Hood_ID") or row.get("hood_id") + if not hood_id: + continue + + try: + neighbourhood_id = int(hood_id) + except (ValueError, TypeError): + continue + + # Crime rate data typically has columns like: + # ASSAULT_2019, ASSAULT_RATE_2019, AUTOTHEFT_2020, etc. + # We need to parse column names to extract crime type and year + + for col_name, value in row.items(): + if value is None or col_name in ( + "_id", + "HOOD_ID", + "Hood_ID", + "hood_id", + "AREA_NAME", + "NEIGHBOURHOOD", + ): + continue + + # Try to parse column name for crime type and year + # Pattern: CRIMETYPE_YEAR or CRIMETYPE_RATE_YEAR + parts = col_name.upper().split("_") + if len(parts) < 2: + continue + + # Check if last part is a year + try: + year = int(parts[-1]) + if year < 2014 or year > 2030: + continue + except ValueError: + continue + + # Filter by years if specified + if years and year not in years: + continue + + # Check if this is a rate column + is_rate = "RATE" in parts + + # Extract crime type (everything before RATE/year) + if is_rate: + rate_idx = parts.index("RATE") + crime_type_str = "_".join(parts[:rate_idx]) + else: + crime_type_str = "_".join(parts[:-1]) + + crime_type = _normalize_crime_type(crime_type_str) + + try: + numeric_value = Decimal(str(value)) + except (ValueError, TypeError): + continue + + if is_rate: + # This is a rate column - look for corresponding count + # We'll skip rate-only entries and create records from counts + continue + + # Find corresponding rate if available + rate_col = f"{crime_type_str}_RATE_{year}" + rate_value = row.get(rate_col) + rate_per_100k = None + if rate_value is not None: + with contextlib.suppress(ValueError, TypeError): + rate_per_100k = Decimal(str(rate_value)) + + records.append( + CrimeRecord( + neighbourhood_id=neighbourhood_id, + year=year, + crime_type=crime_type, + count=int(numeric_value), + rate_per_100k=rate_per_100k, + ) + ) + + logger.info(f"Parsed {len(records)} crime rate records") + return records + + def get_major_crime_indicators( + self, + years: list[int] | None = None, + ) -> list[CrimeRecord]: + """Fetch major crime indicators (detailed MCI data). + + MCI data contains incident-level records that need to be aggregated + by neighbourhood and year. + + Args: + years: Optional list of years to filter. + + Returns: + List of aggregated CrimeRecord objects. + """ + try: + raw_records = self._fetch_datastore_records(self.DATASETS["mci"]) + except (httpx.HTTPError, ValueError) as e: + logger.warning(f"Could not fetch MCI data: {e}") + return [] + + # Aggregate counts by neighbourhood, year, and crime type + aggregates: dict[tuple[int, int, CrimeType], int] = {} + + for row in raw_records: + # Extract neighbourhood ID + hood_id = ( + row.get("HOOD_158") + or row.get("HOOD_140") + or row.get("HOOD_ID") + or row.get("Hood_ID") + ) + if not hood_id: + continue + + try: + neighbourhood_id = int(hood_id) + except (ValueError, TypeError): + continue + + # Extract year from occurrence date + occ_year = row.get("OCC_YEAR") or row.get("REPORT_YEAR") + if not occ_year: + continue + + try: + year = int(occ_year) + if year < 2014 or year > 2030: + continue + except (ValueError, TypeError): + continue + + # Filter by years if specified + if years and year not in years: + continue + + # Extract crime type + mci_category = row.get("MCI_CATEGORY") or row.get("OFFENCE") or "" + crime_type = _normalize_crime_type(str(mci_category)) + + # Aggregate count + key = (neighbourhood_id, year, crime_type) + aggregates[key] = aggregates.get(key, 0) + 1 + + # Convert aggregates to CrimeRecord objects + records = [ + CrimeRecord( + neighbourhood_id=neighbourhood_id, + year=year, + crime_type=crime_type, + count=count, + rate_per_100k=None, # Would need population data to calculate + ) + for (neighbourhood_id, year, crime_type), count in aggregates.items() + ] + + logger.info(f"Parsed {len(records)} MCI records (aggregated)") + return records diff --git a/portfolio_app/toronto/schemas/__init__.py b/portfolio_app/toronto/schemas/__init__.py index 0a470a8..897f6c4 100644 --- a/portfolio_app/toronto/schemas/__init__.py +++ b/portfolio_app/toronto/schemas/__init__.py @@ -1,5 +1,6 @@ """Pydantic schemas for Toronto housing data validation.""" +from .amenities import AmenityCount, AmenityRecord, AmenityType from .cmhc import BedroomType, CMHCAnnualSurvey, CMHCRentalRecord, ReliabilityCode from .dimensions import ( CMHCZone, @@ -11,6 +12,7 @@ from .dimensions import ( PolicyLevel, TimeDimension, ) +from .neighbourhood import CensusRecord, CrimeRecord, CrimeType, NeighbourhoodRecord __all__ = [ # CMHC @@ -28,4 +30,13 @@ __all__ = [ "PolicyCategory", "ExpectedDirection", "Confidence", + # Neighbourhood data (Phase 3) + "NeighbourhoodRecord", + "CensusRecord", + "CrimeRecord", + "CrimeType", + # Amenities (Phase 3) + "AmenityType", + "AmenityRecord", + "AmenityCount", ] diff --git a/portfolio_app/toronto/schemas/amenities.py b/portfolio_app/toronto/schemas/amenities.py new file mode 100644 index 0000000..2922d61 --- /dev/null +++ b/portfolio_app/toronto/schemas/amenities.py @@ -0,0 +1,60 @@ +"""Pydantic schemas for Toronto amenities data. + +Includes schemas for parks, schools, childcare centres, and transit stops. +""" + +from decimal import Decimal +from enum import Enum + +from pydantic import BaseModel, Field + + +class AmenityType(str, Enum): + """Types of amenities tracked in the neighbourhood dashboard.""" + + PARK = "park" + SCHOOL = "school" + CHILDCARE = "childcare" + TRANSIT_STOP = "transit_stop" + LIBRARY = "library" + COMMUNITY_CENTRE = "community_centre" + HOSPITAL = "hospital" + + +class AmenityRecord(BaseModel): + """Amenity location record for a neighbourhood. + + Represents a single amenity (park, school, etc.) with its location + and associated neighbourhood. + """ + + neighbourhood_id: int = Field( + ge=1, le=200, description="Neighbourhood ID containing this amenity" + ) + amenity_type: AmenityType = Field(description="Type of amenity") + amenity_name: str = Field(max_length=200, description="Name of the amenity") + address: str | None = Field( + default=None, max_length=300, description="Street address" + ) + latitude: Decimal | None = Field( + default=None, ge=-90, le=90, description="Latitude (WGS84)" + ) + longitude: Decimal | None = Field( + default=None, ge=-180, le=180, description="Longitude (WGS84)" + ) + + model_config = {"str_strip_whitespace": True} + + +class AmenityCount(BaseModel): + """Aggregated amenity count for a neighbourhood. + + Used for dashboard metrics showing amenity density per neighbourhood. + """ + + neighbourhood_id: int = Field(ge=1, le=200, description="Neighbourhood ID") + amenity_type: AmenityType = Field(description="Type of amenity") + count: int = Field(ge=0, description="Number of amenities of this type") + year: int = Field(ge=2020, le=2030, description="Year of data snapshot") + + model_config = {"str_strip_whitespace": True} diff --git a/portfolio_app/toronto/schemas/neighbourhood.py b/portfolio_app/toronto/schemas/neighbourhood.py new file mode 100644 index 0000000..2a1c567 --- /dev/null +++ b/portfolio_app/toronto/schemas/neighbourhood.py @@ -0,0 +1,106 @@ +"""Pydantic schemas for Toronto neighbourhood data. + +Includes schemas for neighbourhood boundaries, census profiles, and crime statistics. +""" + +from decimal import Decimal +from enum import Enum +from typing import Any + +from pydantic import BaseModel, Field + + +class CrimeType(str, Enum): + """Major crime indicator types from Toronto Police data.""" + + ASSAULT = "assault" + AUTO_THEFT = "auto_theft" + BREAK_AND_ENTER = "break_and_enter" + HOMICIDE = "homicide" + ROBBERY = "robbery" + SHOOTING = "shooting" + THEFT_OVER = "theft_over" + THEFT_FROM_MOTOR_VEHICLE = "theft_from_motor_vehicle" + OTHER = "other" + + +class NeighbourhoodRecord(BaseModel): + """Schema for Toronto neighbourhood boundary data. + + Based on City of Toronto's 158 neighbourhoods dataset. + AREA_ID maps to neighbourhood_id for consistency with police data (Hood_ID). + """ + + area_id: int = Field(description="AREA_ID from Toronto Open Data (1-158)") + area_name: str = Field(max_length=100, description="Official neighbourhood name") + area_short_code: str | None = Field( + default=None, max_length=10, description="Short code (e.g., 'E01')" + ) + geometry: dict[str, Any] | None = Field( + default=None, description="GeoJSON geometry object" + ) + + model_config = {"str_strip_whitespace": True} + + +class CensusRecord(BaseModel): + """Census profile data for a neighbourhood. + + Contains demographic and socioeconomic indicators from Statistics Canada + census data, aggregated to the neighbourhood level. + """ + + neighbourhood_id: int = Field( + ge=1, le=200, description="Neighbourhood ID (AREA_ID)" + ) + census_year: int = Field(ge=2016, le=2030, description="Census year") + population: int | None = Field(default=None, ge=0, description="Total population") + population_density: Decimal | None = Field( + default=None, ge=0, description="Population per square kilometre" + ) + median_household_income: Decimal | None = Field( + default=None, ge=0, description="Median household income (CAD)" + ) + average_household_income: Decimal | None = Field( + default=None, ge=0, description="Average household income (CAD)" + ) + unemployment_rate: Decimal | None = Field( + default=None, ge=0, le=100, description="Unemployment rate percentage" + ) + pct_bachelors_or_higher: Decimal | None = Field( + default=None, ge=0, le=100, description="Percentage with bachelor's degree+" + ) + pct_owner_occupied: Decimal | None = Field( + default=None, ge=0, le=100, description="Percentage owner-occupied dwellings" + ) + pct_renter_occupied: Decimal | None = Field( + default=None, ge=0, le=100, description="Percentage renter-occupied dwellings" + ) + median_age: Decimal | None = Field( + default=None, ge=0, le=120, description="Median age of residents" + ) + average_dwelling_value: Decimal | None = Field( + default=None, ge=0, description="Average dwelling value (CAD)" + ) + + model_config = {"str_strip_whitespace": True} + + +class CrimeRecord(BaseModel): + """Crime statistics for a neighbourhood. + + Based on Toronto Police neighbourhood crime rates data. + Hood_ID in source data maps to neighbourhood_id (AREA_ID). + """ + + neighbourhood_id: int = Field( + ge=1, le=200, description="Neighbourhood ID (Hood_ID -> AREA_ID)" + ) + year: int = Field(ge=2014, le=2030, description="Year of crime statistics") + crime_type: CrimeType = Field(description="Type of crime (MCI category)") + count: int = Field(ge=0, description="Number of incidents") + rate_per_100k: Decimal | None = Field( + default=None, ge=0, description="Rate per 100,000 population" + ) + + model_config = {"str_strip_whitespace": True}