diff --git a/CLAUDE.md b/CLAUDE.md index 642e9dd..04e02b0 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -18,7 +18,7 @@ Working context for Claude Code on the Analytics Portfolio project. ```bash make setup # Install deps, create .env, init pre-commit -make docker-up # Start PostgreSQL + PostGIS +make docker-up # Start PostgreSQL + PostGIS (auto-detects x86/ARM) make docker-down # Stop containers make db-init # Initialize database schema make run # Start Dash dev server @@ -193,6 +193,7 @@ notebooks/ # Data documentation (Phase 6) - SQLAlchemy 2.0 + Pydantic 2.0 only (never mix 1.x APIs) - PostGIS extension required in database - Docker Compose V2 format (no `version` field) +- **Multi-architecture support**: `make docker-up` auto-detects CPU architecture and uses the appropriate PostGIS image (x86_64: `postgis/postgis`, ARM64: `imresamu/postgis`) --- diff --git a/Makefile b/Makefile index fb33118..4d51b4c 100644 --- a/Makefile +++ b/Makefile @@ -8,6 +8,17 @@ PYTHON := python3 PIP := pip DOCKER_COMPOSE := docker compose +# Architecture detection for Docker images +ARCH := $(shell uname -m) +ifeq ($(ARCH),aarch64) + POSTGIS_IMAGE := imresamu/postgis:16-3.4 +else ifeq ($(ARCH),arm64) + POSTGIS_IMAGE := imresamu/postgis:16-3.4 +else + POSTGIS_IMAGE := postgis/postgis:16-3.4 +endif +export POSTGIS_IMAGE + # Colors for output BLUE := \033[0;34m GREEN := \033[0;32m @@ -39,6 +50,7 @@ setup: ## Install dependencies, create .env, init pre-commit docker-up: ## Start PostgreSQL + PostGIS containers @echo "$(GREEN)Starting database containers...$(NC)" + @echo "$(BLUE)Architecture: $(ARCH) -> Using image: $(POSTGIS_IMAGE)$(NC)" $(DOCKER_COMPOSE) up -d @echo "$(GREEN)Waiting for database to be ready...$(NC)" @sleep 3 diff --git a/dbt/models/intermediate/int_census__toronto_cma.sql b/dbt/models/intermediate/int_census__toronto_cma.sql new file mode 100644 index 0000000..37560f4 --- /dev/null +++ b/dbt/models/intermediate/int_census__toronto_cma.sql @@ -0,0 +1,60 @@ +-- Intermediate: Toronto CMA census statistics by year +-- Provides city-wide averages for metrics not available at neighbourhood level +-- Used when neighbourhood-level data is unavailable (e.g., median household income) +-- Grain: One row per year + +with years as ( + select * from {{ ref('int_year_spine') }} +), + +census as ( + select * from {{ ref('stg_toronto__census') }} +), + +-- Census data is only available for 2016 and 2021 +-- Map each analysis year to the appropriate census year +year_to_census as ( + select + y.year, + case + when y.year <= 2018 then 2016 + else 2021 + end as census_year + from years y +), + +-- Toronto CMA median household income from Statistics Canada +-- Source: Census Profile Table 98-316-X2021001 +-- 2016: $65,829 (from Census Profile) +-- 2021: $84,000 (from Census Profile) +cma_income as ( + select 2016 as census_year, 65829 as median_household_income union all + select 2021 as census_year, 84000 as median_household_income +), + +-- City-wide aggregates from loaded neighbourhood data +city_aggregates as ( + select + census_year, + sum(population) as total_population, + avg(population_density) as avg_population_density, + avg(unemployment_rate) as avg_unemployment_rate + from census + where population is not null + group by census_year +), + +final as ( + select + y.year, + y.census_year, + ci.median_household_income, + ca.total_population, + ca.avg_population_density, + ca.avg_unemployment_rate + from year_to_census y + left join cma_income ci on y.census_year = ci.census_year + left join city_aggregates ca on y.census_year = ca.census_year +) + +select * from final diff --git a/dbt/models/intermediate/int_neighbourhood__amenity_scores.sql b/dbt/models/intermediate/int_neighbourhood__amenity_scores.sql index 8890177..a5d828e 100644 --- a/dbt/models/intermediate/int_neighbourhood__amenity_scores.sql +++ b/dbt/models/intermediate/int_neighbourhood__amenity_scores.sql @@ -34,7 +34,7 @@ amenity_scores as ( n.population, n.land_area_sqkm, - a.year, + coalesce(a.year, 2021) as year, -- Raw counts a.parks_count, diff --git a/dbt/models/intermediate/int_neighbourhood__crime_summary.sql b/dbt/models/intermediate/int_neighbourhood__crime_summary.sql index bd20997..cf5c3c8 100644 --- a/dbt/models/intermediate/int_neighbourhood__crime_summary.sql +++ b/dbt/models/intermediate/int_neighbourhood__crime_summary.sql @@ -64,15 +64,17 @@ crime_summary as ( w.robbery_count, w.theft_over_count, w.homicide_count, - w.avg_rate_per_100k, w.yoy_change_pct, - -- Crime rate per 100K population - case - when n.population > 0 - then round(w.total_incidents::numeric / n.population * 100000, 2) - else null - end as crime_rate_per_100k + -- Crime rate per 100K population (use source data avg, or calculate if population available) + coalesce( + w.avg_rate_per_100k, + case + when n.population > 0 + then round(w.total_incidents::numeric / n.population * 100000, 2) + else null + end + ) as crime_rate_per_100k from neighbourhoods n inner join with_yoy w on n.neighbourhood_id = w.neighbourhood_id diff --git a/dbt/models/intermediate/int_neighbourhood__demographics.sql b/dbt/models/intermediate/int_neighbourhood__demographics.sql index 2316f6a..1791c3f 100644 --- a/dbt/models/intermediate/int_neighbourhood__demographics.sql +++ b/dbt/models/intermediate/int_neighbourhood__demographics.sql @@ -17,7 +17,8 @@ demographics as ( n.geometry, n.land_area_sqkm, - c.census_year, + -- Use census_year from census data, or fall back to dim_neighbourhood's year + coalesce(c.census_year, n.census_year, 2021) as census_year, c.population, c.population_density, c.median_household_income, diff --git a/dbt/models/intermediate/int_neighbourhood__housing.sql b/dbt/models/intermediate/int_neighbourhood__housing.sql index 9d7f705..f7c0d59 100644 --- a/dbt/models/intermediate/int_neighbourhood__housing.sql +++ b/dbt/models/intermediate/int_neighbourhood__housing.sql @@ -20,7 +20,7 @@ housing as ( n.neighbourhood_name, n.geometry, - coalesce(r.year, c.census_year) as year, + coalesce(r.year, c.census_year, 2021) as year, -- Census housing metrics c.pct_owner_occupied, diff --git a/dbt/models/intermediate/int_rentals__toronto_cma.sql b/dbt/models/intermediate/int_rentals__toronto_cma.sql new file mode 100644 index 0000000..66cc944 --- /dev/null +++ b/dbt/models/intermediate/int_rentals__toronto_cma.sql @@ -0,0 +1,25 @@ +-- Intermediate: Toronto CMA rental metrics by year +-- Aggregates rental data to city-wide averages by year +-- Source: StatCan CMHC data at CMA level +-- Grain: One row per year + +with rentals as ( + select * from {{ ref('stg_cmhc__rentals') }} +), + +-- Pivot bedroom types to columns +yearly_rentals as ( + select + year, + max(case when bedroom_type = 'bachelor' then avg_rent end) as avg_rent_bachelor, + max(case when bedroom_type = '1bed' then avg_rent end) as avg_rent_1bed, + max(case when bedroom_type = '2bed' then avg_rent end) as avg_rent_2bed, + max(case when bedroom_type = '3bed' then avg_rent end) as avg_rent_3bed, + -- Use 2-bedroom as standard reference + max(case when bedroom_type = '2bed' then avg_rent end) as avg_rent_standard, + max(vacancy_rate) as vacancy_rate + from rentals + group by year +) + +select * from yearly_rentals diff --git a/dbt/models/intermediate/int_year_spine.sql b/dbt/models/intermediate/int_year_spine.sql new file mode 100644 index 0000000..3760e1e --- /dev/null +++ b/dbt/models/intermediate/int_year_spine.sql @@ -0,0 +1,11 @@ +-- Intermediate: Year spine for analysis +-- Creates a row for each year from 2014-2025 +-- Used to drive time-series analysis across all data sources + +with years as ( + -- Generate years from available data sources + -- Crime data: 2014-2024, Rentals: 2019-2025 + select generate_series(2014, 2025) as year +) + +select year from years diff --git a/dbt/models/marts/mart_neighbourhood_overview.sql b/dbt/models/marts/mart_neighbourhood_overview.sql index a43e681..5a9c256 100644 --- a/dbt/models/marts/mart_neighbourhood_overview.sql +++ b/dbt/models/marts/mart_neighbourhood_overview.sql @@ -1,79 +1,119 @@ -- Mart: Neighbourhood Overview with Composite Livability Score -- Dashboard Tab: Overview -- Grain: One row per neighbourhood per year +-- Time spine: Years 2014-2025 (driven by crime/rental data availability) -with demographics as ( - select * from {{ ref('int_neighbourhood__demographics') }} +with years as ( + select * from {{ ref('int_year_spine') }} ), -housing as ( - select * from {{ ref('int_neighbourhood__housing') }} +neighbourhoods as ( + select * from {{ ref('stg_toronto__neighbourhoods') }} ), +-- Create base: all neighbourhoods × all years +neighbourhood_years as ( + select + n.neighbourhood_id, + n.neighbourhood_name, + n.geometry, + y.year + from neighbourhoods n + cross join years y +), + +-- Census data (available for 2016, 2021) +-- For each year, use the most recent census data available +census as ( + select * from {{ ref('stg_toronto__census') }} +), + +census_mapped as ( + select + ny.neighbourhood_id, + ny.year, + c.population, + c.unemployment_rate, + c.pct_bachelors_or_higher as education_bachelors_pct + from neighbourhood_years ny + left join census c on ny.neighbourhood_id = c.neighbourhood_id + -- Use census year <= analysis year, prefer most recent + and c.census_year = ( + select max(c2.census_year) + from {{ ref('stg_toronto__census') }} c2 + where c2.neighbourhood_id = ny.neighbourhood_id + and c2.census_year <= ny.year + ) +), + +-- CMA-level census data (for income - not available at neighbourhood level) +cma_census as ( + select * from {{ ref('int_census__toronto_cma') }} +), + +-- Crime data (2014-2024) crime as ( select * from {{ ref('int_neighbourhood__crime_summary') }} ), -amenities as ( - select * from {{ ref('int_neighbourhood__amenity_scores') }} +-- Rentals (2019-2025) - CMA level applied to all neighbourhoods +rentals as ( + select * from {{ ref('int_rentals__toronto_cma') }} ), --- Compute percentile ranks for scoring components -percentiles as ( +-- Compute scores +scored as ( select - d.neighbourhood_id, - d.neighbourhood_name, - d.geometry, - d.census_year as year, - d.population, - d.median_household_income, + ny.neighbourhood_id, + ny.neighbourhood_name, + ny.geometry, + ny.year, + cm.population, + -- Use CMA-level income (neighbourhood-level not available in Toronto Open Data) + cma.median_household_income, -- Safety score: inverse of crime rate (higher = safer) case - when c.crime_rate_per_100k is not null + when cr.crime_rate_per_100k is not null then 100 - percent_rank() over ( - partition by d.census_year - order by c.crime_rate_per_100k + partition by ny.year + order by cr.crime_rate_per_100k ) * 100 else null end as safety_score, -- Affordability score: inverse of rent-to-income ratio + -- Using CMA-level income since neighbourhood-level not available case - when h.rent_to_income_pct is not null + when cma.median_household_income > 0 and r.avg_rent_standard > 0 then 100 - percent_rank() over ( - partition by d.census_year - order by h.rent_to_income_pct + partition by ny.year + order by (r.avg_rent_standard * 12 / cma.median_household_income) ) * 100 else null end as affordability_score, - -- Amenity score: based on amenities per capita + -- Raw metrics + cr.crime_rate_per_100k, case - when a.total_amenities_per_1000 is not null - then percent_rank() over ( - partition by d.census_year - order by a.total_amenities_per_1000 - ) * 100 + when cma.median_household_income > 0 and r.avg_rent_standard > 0 + then round((r.avg_rent_standard * 12 / cma.median_household_income) * 100, 2) else null - end as amenity_score, + end as rent_to_income_pct, + r.avg_rent_standard as avg_rent_2bed, + r.vacancy_rate - -- Raw metrics for reference - c.crime_rate_per_100k, - h.rent_to_income_pct, - h.avg_rent_2bed, - a.total_amenities_per_1000 - - from demographics d - left join housing h - on d.neighbourhood_id = h.neighbourhood_id - and d.census_year = h.year - left join crime c - on d.neighbourhood_id = c.neighbourhood_id - and d.census_year = c.year - left join amenities a - on d.neighbourhood_id = a.neighbourhood_id - and d.census_year = a.year + from neighbourhood_years ny + left join census_mapped cm + on ny.neighbourhood_id = cm.neighbourhood_id + and ny.year = cm.year + left join cma_census cma + on ny.year = cma.year + left join crime cr + on ny.neighbourhood_id = cr.neighbourhood_id + and ny.year = cr.year + left join rentals r + on ny.year = r.year ), final as ( @@ -88,13 +128,14 @@ final as ( -- Component scores (0-100) round(safety_score::numeric, 1) as safety_score, round(affordability_score::numeric, 1) as affordability_score, - round(amenity_score::numeric, 1) as amenity_score, + -- Amenity score not available at this level, use placeholder + 50.0 as amenity_score, - -- Composite livability score: safety (30%), affordability (40%), amenities (30%) + -- Composite livability score: safety (40%), affordability (40%), amenities (20%) round( - (coalesce(safety_score, 50) * 0.30 + + (coalesce(safety_score, 50) * 0.40 + coalesce(affordability_score, 50) * 0.40 + - coalesce(amenity_score, 50) * 0.30)::numeric, + 50 * 0.20)::numeric, 1 ) as livability_score, @@ -102,9 +143,10 @@ final as ( crime_rate_per_100k, rent_to_income_pct, avg_rent_2bed, - total_amenities_per_1000 + vacancy_rate, + null::numeric as total_amenities_per_1000 - from percentiles + from scored ) select * from final diff --git a/dbt/models/staging/stg_cmhc__rentals.sql b/dbt/models/staging/stg_cmhc__rentals.sql index 2faa941..d2d181e 100644 --- a/dbt/models/staging/stg_cmhc__rentals.sql +++ b/dbt/models/staging/stg_cmhc__rentals.sql @@ -1,9 +1,13 @@ -- Staged CMHC rental market survey data --- Source: fact_rentals table loaded from CMHC CSV exports +-- Source: fact_rentals table loaded from CMHC/StatCan -- Grain: One row per zone per bedroom type per survey year with source as ( - select * from {{ source('toronto_housing', 'fact_rentals') }} + select + f.*, + t.year as survey_year + from {{ source('toronto_housing', 'fact_rentals') }} f + join {{ source('toronto_housing', 'dim_time') }} t on f.date_key = t.date_key ), staged as ( @@ -11,6 +15,7 @@ staged as ( id as rental_id, date_key, zone_key, + survey_year as year, bedroom_type, universe as rental_universe, avg_rent, diff --git a/docker-compose.yml b/docker-compose.yml index 5d04bca..e598c2f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,6 @@ services: db: - image: postgis/postgis:16-3.4 + image: ${POSTGIS_IMAGE:-postgis/postgis:16-3.4} container_name: portfolio-db restart: unless-stopped ports: diff --git a/portfolio_app/pages/toronto/callbacks/chart_callbacks.py b/portfolio_app/pages/toronto/callbacks/chart_callbacks.py index 9cd0dc7..1207100 100644 --- a/portfolio_app/pages/toronto/callbacks/chart_callbacks.py +++ b/portfolio_app/pages/toronto/callbacks/chart_callbacks.py @@ -1,6 +1,7 @@ """Chart callbacks for supporting visualizations.""" # mypy: disable-error-code="misc,no-untyped-def,arg-type" +import pandas as pd import plotly.graph_objects as go from dash import Input, Output, callback @@ -43,7 +44,24 @@ def update_overview_scatter(year: str) -> go.Figure: # Compute safety score (inverse of crime rate) if "total_crime_rate" in merged.columns: max_crime = merged["total_crime_rate"].max() - merged["safety_score"] = 100 - (merged["total_crime_rate"] / max_crime * 100) + if max_crime and max_crime > 0: + merged["safety_score"] = 100 - ( + merged["total_crime_rate"] / max_crime * 100 + ) + else: + merged["safety_score"] = 50 # Default if no crime data + + # Fill NULL population with median or default value for sizing + if "population" in merged.columns: + median_pop = merged["population"].median() + default_pop = median_pop if pd.notna(median_pop) else 10000 + merged["population"] = merged["population"].fillna(default_pop) + + # Filter rows with required data for scatter plot + merged = merged.dropna(subset=["median_household_income", "safety_score"]) + + if merged.empty: + return _empty_chart("Insufficient data for scatter plot") data = merged.to_dict("records") @@ -76,12 +94,13 @@ def update_housing_trend(year: str, neighbourhood_id: int | None) -> go.Figure: return _empty_chart("No trend data available") # Placeholder for trend data - would be historical + base_rent = averages.get("avg_rent_2bed") or 2000 data = [ - {"year": "2019", "avg_rent": averages.get("avg_rent_2bed", 2000) * 0.85}, - {"year": "2020", "avg_rent": averages.get("avg_rent_2bed", 2000) * 0.88}, - {"year": "2021", "avg_rent": averages.get("avg_rent_2bed", 2000) * 0.92}, - {"year": "2022", "avg_rent": averages.get("avg_rent_2bed", 2000) * 0.96}, - {"year": "2023", "avg_rent": averages.get("avg_rent_2bed", 2000)}, + {"year": "2019", "avg_rent": base_rent * 0.85}, + {"year": "2020", "avg_rent": base_rent * 0.88}, + {"year": "2021", "avg_rent": base_rent * 0.92}, + {"year": "2022", "avg_rent": base_rent * 0.96}, + {"year": "2023", "avg_rent": base_rent}, ] fig = go.Figure() @@ -330,10 +349,11 @@ def update_amenities_radar(year: str, neighbourhood_id: int | None) -> go.Figure # Get city averages averages = get_city_averages(year_int) + amenity_score = averages.get("avg_amenity_score") or 50 city_data = { - "parks_per_1000": averages.get("avg_amenity_score", 50) / 100 * 10, - "schools_per_1000": averages.get("avg_amenity_score", 50) / 100 * 5, - "childcare_per_1000": averages.get("avg_amenity_score", 50) / 100 * 3, + "parks_per_1000": amenity_score / 100 * 10, + "schools_per_1000": amenity_score / 100 * 5, + "childcare_per_1000": amenity_score / 100 * 3, "transit_access": 70, } diff --git a/portfolio_app/toronto/loaders/__init__.py b/portfolio_app/toronto/loaders/__init__.py index b977d1d..d828308 100644 --- a/portfolio_app/toronto/loaders/__init__.py +++ b/portfolio_app/toronto/loaders/__init__.py @@ -3,7 +3,12 @@ from .amenities import load_amenities, load_amenity_counts from .base import bulk_insert, get_session, upsert_by_key from .census import load_census_data -from .cmhc import load_cmhc_record, load_cmhc_rentals +from .cmhc import ( + ensure_toronto_cma_zone, + load_cmhc_record, + load_cmhc_rentals, + load_statcan_cmhc_data, +) from .cmhc_crosswalk import ( build_cmhc_neighbourhood_crosswalk, disaggregate_zone_value, @@ -32,6 +37,8 @@ __all__ = [ # Fact loaders "load_cmhc_rentals", "load_cmhc_record", + "load_statcan_cmhc_data", + "ensure_toronto_cma_zone", # Phase 3 loaders "load_census_data", "load_crime_data", diff --git a/portfolio_app/toronto/loaders/cmhc.py b/portfolio_app/toronto/loaders/cmhc.py index 09691be..fe31535 100644 --- a/portfolio_app/toronto/loaders/cmhc.py +++ b/portfolio_app/toronto/loaders/cmhc.py @@ -1,5 +1,9 @@ """Loader for CMHC rental data into fact_rentals.""" +import logging +from datetime import date +from typing import Any + from sqlalchemy.orm import Session from portfolio_app.toronto.models import DimCMHCZone, DimTime, FactRentals @@ -8,6 +12,12 @@ from portfolio_app.toronto.schemas import CMHCAnnualSurvey, CMHCRentalRecord from .base import get_session, upsert_by_key from .dimensions import generate_date_key +logger = logging.getLogger(__name__) + +# Toronto CMA zone code for CMA-level data +TORONTO_CMA_ZONE_CODE = "TORCMA" +TORONTO_CMA_ZONE_NAME = "Toronto CMA" + def load_cmhc_rentals( survey: CMHCAnnualSurvey, @@ -135,3 +145,117 @@ def load_cmhc_record( return _load(session) with get_session() as sess: return _load(sess) + + +def ensure_toronto_cma_zone(session: Session | None = None) -> int: + """Ensure Toronto CMA zone exists in dim_cmhc_zone. + + Creates the zone if it doesn't exist. + + Args: + session: Optional existing session. + + Returns: + The zone_key for Toronto CMA. + """ + + def _ensure(sess: Session) -> int: + zone = ( + sess.query(DimCMHCZone).filter_by(zone_code=TORONTO_CMA_ZONE_CODE).first() + ) + if zone: + return int(zone.zone_key) + + # Create new zone + new_zone = DimCMHCZone( + zone_code=TORONTO_CMA_ZONE_CODE, + zone_name=TORONTO_CMA_ZONE_NAME, + geometry=None, # CMA-level doesn't need geometry + ) + sess.add(new_zone) + sess.flush() + logger.info(f"Created Toronto CMA zone with zone_key={new_zone.zone_key}") + return int(new_zone.zone_key) + + if session: + return _ensure(session) + with get_session() as sess: + result = _ensure(sess) + sess.commit() + return result + + +def load_statcan_cmhc_data( + records: list[Any], # List of CMHCRentalRecord from statcan_cmhc parser + session: Session | None = None, +) -> int: + """Load CMHC rental data from StatCan parser into fact_rentals. + + This function handles CMA-level data from the StatCan API, which provides + aggregate Toronto data rather than zone-level HMIP data. + + Args: + records: List of CMHCRentalRecord dataclass instances from statcan_cmhc parser. + session: Optional existing session. + + Returns: + Number of records loaded. + """ + from portfolio_app.toronto.parsers.statcan_cmhc import ( + CMHCRentalRecord as StatCanRecord, + ) + + def _load(sess: Session) -> int: + # Ensure Toronto CMA zone exists + zone_key = ensure_toronto_cma_zone(sess) + + loaded = 0 + for record in records: + if not isinstance(record, StatCanRecord): + logger.warning(f"Skipping invalid record type: {type(record)}") + continue + + # Generate date key for this record's survey date + survey_date = date(record.year, record.month, 1) + date_key = generate_date_key(survey_date) + + # Verify time dimension exists + time_dim = sess.query(DimTime).filter_by(date_key=date_key).first() + if not time_dim: + logger.warning( + f"Time dimension not found for {survey_date}, skipping record" + ) + continue + + # Create fact record + fact = FactRentals( + date_key=date_key, + zone_key=zone_key, + bedroom_type=record.bedroom_type, + universe=record.universe, + avg_rent=float(record.avg_rent) if record.avg_rent else None, + median_rent=None, # StatCan doesn't provide median + vacancy_rate=float(record.vacancy_rate) + if record.vacancy_rate + else None, + availability_rate=None, + turnover_rate=None, + rent_change_pct=None, + reliability_code=None, + ) + + # Upsert + inserted, updated = upsert_by_key( + sess, FactRentals, [fact], ["date_key", "zone_key", "bedroom_type"] + ) + loaded += inserted + updated + + logger.info(f"Loaded {loaded} CMHC rental records from StatCan") + return loaded + + if session: + return _load(session) + with get_session() as sess: + result = _load(sess) + sess.commit() + return result diff --git a/portfolio_app/toronto/parsers/statcan_cmhc.py b/portfolio_app/toronto/parsers/statcan_cmhc.py new file mode 100644 index 0000000..75d590c --- /dev/null +++ b/portfolio_app/toronto/parsers/statcan_cmhc.py @@ -0,0 +1,383 @@ +"""Parser for CMHC rental data via Statistics Canada API. + +Downloads rental market data (average rent, vacancy rates, universe) +from Statistics Canada's Web Data Service. + +Data Sources: +- Table 34-10-0127: Vacancy rates +- Table 34-10-0129: Rental universe (total units) +- Table 34-10-0133: Average rent by bedroom type +""" + +import contextlib +import io +import logging +import zipfile +from dataclasses import dataclass +from decimal import Decimal +from pathlib import Path +from typing import Any + +import httpx +import pandas as pd + +logger = logging.getLogger(__name__) + +# StatCan Web Data Service endpoints +STATCAN_API_BASE = "https://www150.statcan.gc.ca/t1/wds/rest" +STATCAN_DOWNLOAD_BASE = "https://www150.statcan.gc.ca/n1/tbl/csv" + +# CMHC table IDs +CMHC_TABLES = { + "vacancy": "34100127", + "universe": "34100129", + "rent": "34100133", +} + +# Toronto CMA identifier in StatCan data +TORONTO_DGUID = "2011S0503535" +TORONTO_GEO_NAME = "Toronto, Ontario" + + +@dataclass +class CMHCRentalRecord: + """Rental market record for database loading.""" + + year: int + month: int # CMHC surveys in October, so month=10 + zone_name: str + bedroom_type: str + avg_rent: Decimal | None + vacancy_rate: Decimal | None + universe: int | None + + +class StatCanCMHCParser: + """Parser for CMHC rental data from Statistics Canada. + + Downloads and processes rental market survey data including: + - Average rents by bedroom type + - Vacancy rates + - Rental universe (total units) + + Data is available from 1987 to present, updated annually in January. + """ + + BEDROOM_TYPE_MAP = { + "Bachelor units": "bachelor", + "One bedroom units": "1bed", + "Two bedroom units": "2bed", + "Three bedroom units": "3bed", + "Total": "total", + } + + STRUCTURE_FILTER = "Apartment structures of six units and over" + + def __init__( + self, + cache_dir: Path | None = None, + timeout: float = 60.0, + ) -> None: + """Initialize parser. + + Args: + cache_dir: Optional directory for caching downloaded files. + timeout: HTTP request timeout in seconds. + """ + self._cache_dir = cache_dir + self._timeout = timeout + self._client: httpx.Client | None = None + + @property + def client(self) -> httpx.Client: + """Lazy-initialize HTTP client.""" + if self._client is None: + self._client = httpx.Client( + timeout=self._timeout, + follow_redirects=True, + ) + return self._client + + def close(self) -> None: + """Close HTTP client.""" + if self._client is not None: + self._client.close() + self._client = None + + def __enter__(self) -> "StatCanCMHCParser": + return self + + def __exit__(self, *args: Any) -> None: + self.close() + + def _get_download_url(self, table_id: str) -> str: + """Get CSV download URL for a StatCan table. + + Args: + table_id: StatCan table ID (e.g., "34100133"). + + Returns: + Direct download URL for the CSV zip file. + """ + api_url = f"{STATCAN_API_BASE}/getFullTableDownloadCSV/{table_id}/en" + response = self.client.get(api_url) + response.raise_for_status() + + data = response.json() + if data.get("status") != "SUCCESS": + raise ValueError(f"StatCan API error: {data}") + + return str(data["object"]) + + def _download_table(self, table_id: str) -> pd.DataFrame: + """Download and extract a StatCan table as DataFrame. + + Args: + table_id: StatCan table ID. + + Returns: + DataFrame with table data. + """ + # Check cache first + if self._cache_dir: + cache_file = self._cache_dir / f"{table_id}.csv" + if cache_file.exists(): + logger.debug(f"Loading {table_id} from cache") + return pd.read_csv(cache_file) + + # Get download URL and fetch + download_url = self._get_download_url(table_id) + logger.info(f"Downloading StatCan table {table_id}...") + + response = self.client.get(download_url) + response.raise_for_status() + + # Extract CSV from zip + with zipfile.ZipFile(io.BytesIO(response.content)) as zf: + csv_name = f"{table_id}.csv" + with zf.open(csv_name) as f: + df = pd.read_csv(f) + + # Cache if directory specified + if self._cache_dir: + self._cache_dir.mkdir(parents=True, exist_ok=True) + df.to_csv(self._cache_dir / f"{table_id}.csv", index=False) + + logger.info(f"Downloaded {len(df)} records from table {table_id}") + return df + + def _filter_toronto(self, df: pd.DataFrame) -> pd.DataFrame: + """Filter DataFrame to Toronto CMA only. + + Args: + df: Full StatCan DataFrame. + + Returns: + DataFrame filtered to Toronto. + """ + # Try DGUID first, then GEO name + if "DGUID" in df.columns: + toronto_df = df[df["DGUID"] == TORONTO_DGUID] + if len(toronto_df) > 0: + return toronto_df + + if "GEO" in df.columns: + return df[df["GEO"] == TORONTO_GEO_NAME] + + raise ValueError("Could not identify Toronto data in DataFrame") + + def get_vacancy_rates( + self, + years: list[int] | None = None, + ) -> dict[int, Decimal]: + """Fetch Toronto vacancy rates by year. + + Args: + years: Optional list of years to filter. + + Returns: + Dictionary mapping year to vacancy rate. + """ + df = self._download_table(CMHC_TABLES["vacancy"]) + df = self._filter_toronto(df) + + # Filter years if specified + if years: + df = df[df["REF_DATE"].isin(years)] + + # Extract year -> rate mapping + rates = {} + for _, row in df.iterrows(): + year = int(row["REF_DATE"]) + value = row.get("VALUE") + if pd.notna(value): + rates[year] = Decimal(str(value)) + + logger.info(f"Fetched vacancy rates for {len(rates)} years") + return rates + + def get_rental_universe( + self, + years: list[int] | None = None, + ) -> dict[tuple[int, str], int]: + """Fetch Toronto rental universe (total units) by year and bedroom type. + + Args: + years: Optional list of years to filter. + + Returns: + Dictionary mapping (year, bedroom_type) to unit count. + """ + df = self._download_table(CMHC_TABLES["universe"]) + df = self._filter_toronto(df) + + # Filter to standard apartment structures + if "Type of structure" in df.columns: + df = df[df["Type of structure"] == self.STRUCTURE_FILTER] + + if years: + df = df[df["REF_DATE"].isin(years)] + + universe = {} + for _, row in df.iterrows(): + year = int(row["REF_DATE"]) + bedroom_raw = row.get("Type of unit", "Total") + bedroom = self.BEDROOM_TYPE_MAP.get(bedroom_raw, "other") + value = row.get("VALUE") + + if pd.notna(value) and value is not None: + universe[(year, bedroom)] = int(str(value)) + + logger.info( + f"Fetched rental universe for {len(universe)} year/bedroom combinations" + ) + return universe + + def get_average_rents( + self, + years: list[int] | None = None, + ) -> dict[tuple[int, str], Decimal]: + """Fetch Toronto average rents by year and bedroom type. + + Args: + years: Optional list of years to filter. + + Returns: + Dictionary mapping (year, bedroom_type) to average rent. + """ + df = self._download_table(CMHC_TABLES["rent"]) + df = self._filter_toronto(df) + + # Filter to standard apartment structures (most reliable data) + if "Type of structure" in df.columns: + df = df[df["Type of structure"] == self.STRUCTURE_FILTER] + + if years: + df = df[df["REF_DATE"].isin(years)] + + rents = {} + for _, row in df.iterrows(): + year = int(row["REF_DATE"]) + bedroom_raw = row.get("Type of unit", "Total") + bedroom = self.BEDROOM_TYPE_MAP.get(bedroom_raw, "other") + value = row.get("VALUE") + + if pd.notna(value) and str(value) not in ("F", ".."): + with contextlib.suppress(Exception): + rents[(year, bedroom)] = Decimal(str(value)) + + logger.info(f"Fetched average rents for {len(rents)} year/bedroom combinations") + return rents + + def get_all_rental_data( + self, + start_year: int = 2014, + end_year: int | None = None, + ) -> list[CMHCRentalRecord]: + """Fetch all Toronto rental data and combine into records. + + Args: + start_year: First year to include. + end_year: Last year to include (defaults to current year + 1). + + Returns: + List of CMHCRentalRecord objects ready for database loading. + """ + import datetime + + if end_year is None: + end_year = datetime.date.today().year + 1 + + years = list(range(start_year, end_year + 1)) + + logger.info( + f"Fetching CMHC rental data for Toronto ({start_year}-{end_year})..." + ) + + # Fetch all data types + vacancy_rates = self.get_vacancy_rates(years) + rents = self.get_average_rents(years) + universe = self.get_rental_universe(years) + + # Combine into records + records = [] + bedroom_types = ["bachelor", "1bed", "2bed", "3bed"] + + for year in years: + vacancy = vacancy_rates.get(year) + + for bedroom in bedroom_types: + avg_rent = rents.get((year, bedroom)) + units = universe.get((year, bedroom)) + + # Skip if no rent data for this year/bedroom + if avg_rent is None: + continue + + records.append( + CMHCRentalRecord( + year=year, + month=10, # CMHC surveys in October + zone_name="Toronto CMA", + bedroom_type=bedroom, + avg_rent=avg_rent, + vacancy_rate=vacancy, + universe=units, + ) + ) + + logger.info(f"Created {len(records)} CMHC rental records") + return records + + +def fetch_toronto_rental_data( + start_year: int = 2014, + end_year: int | None = None, + cache_dir: Path | None = None, +) -> list[CMHCRentalRecord]: + """Convenience function to fetch Toronto rental data. + + Args: + start_year: First year to include. + end_year: Last year to include. + cache_dir: Optional cache directory. + + Returns: + List of CMHCRentalRecord objects. + """ + with StatCanCMHCParser(cache_dir=cache_dir) as parser: + return parser.get_all_rental_data(start_year, end_year) + + +if __name__ == "__main__": + # Test the parser + logging.basicConfig(level=logging.INFO) + + records = fetch_toronto_rental_data(start_year=2020) + + print(f"\nFetched {len(records)} records") + print("\nSample records:") + for r in records[:10]: + print( + f" {r.year} {r.bedroom_type}: ${r.avg_rent} rent, {r.vacancy_rate}% vacancy" + ) diff --git a/portfolio_app/toronto/parsers/toronto_open_data.py b/portfolio_app/toronto/parsers/toronto_open_data.py index 110add7..30e756d 100644 --- a/portfolio_app/toronto/parsers/toronto_open_data.py +++ b/portfolio_app/toronto/parsers/toronto_open_data.py @@ -6,6 +6,7 @@ from the City of Toronto's Open Data Portal. API Documentation: https://open.toronto.ca/dataset/ """ +import contextlib import json import logging from decimal import Decimal @@ -193,6 +194,9 @@ class TorontoOpenDataParser: def _fetch_geojson(self, package_id: str) -> dict[str, Any]: """Fetch GeoJSON data from a package. + Handles both pure GeoJSON responses and CSV responses with embedded + geometry columns (common in Toronto Open Data). + Args: package_id: The package/dataset ID. @@ -212,16 +216,65 @@ class TorontoOpenDataParser: response = self.client.get(url) response.raise_for_status() - data = response.json() - # Cache the response + # Try to parse as JSON first + try: + data = response.json() + # If it's already a valid GeoJSON FeatureCollection, return it + if isinstance(data, dict) and data.get("type") == "FeatureCollection": + if self._cache_dir: + self._cache_dir.mkdir(parents=True, exist_ok=True) + cache_file = self._cache_dir / f"{package_id}.geojson" + with open(cache_file, "w", encoding="utf-8") as f: + json.dump(data, f) + return dict(data) + except json.JSONDecodeError: + pass + + # If JSON parsing failed, it's likely CSV with embedded geometry + # Parse CSV and convert to GeoJSON FeatureCollection + logger.info("Response is CSV format, converting to GeoJSON...") + import csv + import io + + # Increase field size limit for large geometry columns + csv.field_size_limit(10 * 1024 * 1024) # 10 MB + + csv_text = response.text + reader = csv.DictReader(io.StringIO(csv_text)) + + features = [] + for row in reader: + # Extract geometry from the 'geometry' column if present + geometry = None + if "geometry" in row and row["geometry"]: + with contextlib.suppress(json.JSONDecodeError): + geometry = json.loads(row["geometry"]) + + # Build properties from all other columns + properties = {k: v for k, v in row.items() if k != "geometry"} + + features.append( + { + "type": "Feature", + "geometry": geometry, + "properties": properties, + } + ) + + geojson_data: dict[str, Any] = { + "type": "FeatureCollection", + "features": features, + } + + # Cache the converted response if self._cache_dir: self._cache_dir.mkdir(parents=True, exist_ok=True) cache_file = self._cache_dir / f"{package_id}.geojson" with open(cache_file, "w", encoding="utf-8") as f: - json.dump(data, f) + json.dump(geojson_data, f) - return dict(data) + return geojson_data def _fetch_csv_as_json(self, package_id: str) -> list[dict[str, Any]]: """Fetch CSV data as JSON records via CKAN datastore. @@ -282,29 +335,32 @@ class TorontoOpenDataParser: props = feature.get("properties", {}) geometry = feature.get("geometry") - # Extract area_id from various possible property names - area_id = props.get("AREA_ID") or props.get("area_id") - if area_id is None: - # Try AREA_SHORT_CODE as fallback - short_code = props.get("AREA_SHORT_CODE", "") - if short_code: - # Extract numeric part - area_id = int("".join(c for c in short_code if c.isdigit()) or "0") + # Use AREA_SHORT_CODE as the primary ID (1-158 range) + # AREA_ID is a large internal identifier not useful for our schema + short_code = props.get("AREA_SHORT_CODE") or props.get( + "area_short_code", "" + ) + if short_code: + area_id = int("".join(c for c in str(short_code) if c.isdigit()) or "0") + else: + # Fallback to _id (row number) if AREA_SHORT_CODE not available + area_id = int(props.get("_id", 0)) + + if area_id == 0: + logger.warning(f"Skipping neighbourhood with no valid ID: {props}") + continue area_name = ( props.get("AREA_NAME") or props.get("area_name") or f"Neighbourhood {area_id}" ) - area_short_code = props.get("AREA_SHORT_CODE") or props.get( - "area_short_code" - ) records.append( NeighbourhoodRecord( - area_id=int(area_id), + area_id=area_id, area_name=str(area_name), - area_short_code=area_short_code, + area_short_code=str(short_code) if short_code else None, geometry=geometry, ) ) @@ -314,17 +370,17 @@ class TorontoOpenDataParser: # Mapping of indicator names to CensusRecord fields # Keys are partial matches (case-insensitive) found in the "Characteristic" column + # Order matters - first match wins, so more specific patterns come first + # Note: owner/renter counts are raw numbers, not percentages - calculated in dbt CENSUS_INDICATOR_MAPPING: dict[str, str] = { "population, 2021": "population", "population, 2016": "population", "population density per square kilometre": "population_density", - "median total income of household": "median_household_income", - "average total income of household": "average_household_income", + "median total income of households in": "median_household_income", + "average total income of households in": "average_household_income", "unemployment rate": "unemployment_rate", "bachelor's degree or higher": "pct_bachelors_or_higher", - "owner": "pct_owner_occupied", - "renter": "pct_renter_occupied", - "median age": "median_age", + "average age": "median_age", "average value of dwellings": "average_dwelling_value", } @@ -358,17 +414,31 @@ class TorontoOpenDataParser: logger.info(f"Fetched {len(raw_records)} census profile rows") # Find the characteristic/indicator column name + # Prioritize "Characteristic" over "Category" since both may exist sample_row = raw_records[0] char_col = None - for col in sample_row: - col_lower = col.lower() - if "characteristic" in col_lower or "category" in col_lower: - char_col = col - break + + # First try exact match for Characteristic + if "Characteristic" in sample_row: + char_col = "Characteristic" + else: + # Fall back to pattern matching + for col in sample_row: + col_lower = col.lower() + if "characteristic" in col_lower: + char_col = col + break + + # Last resort: try Category + if not char_col: + for col in sample_row: + if "category" in col.lower(): + char_col = col + break if not char_col: - # Try common column names - for candidate in ["Characteristic", "Category", "Topic", "_id"]: + # Try other common column names + for candidate in ["Topic", "_id"]: if candidate in sample_row: char_col = candidate break diff --git a/portfolio_app/toronto/services/geometry_service.py b/portfolio_app/toronto/services/geometry_service.py index 959ab7f..fec82aa 100644 --- a/portfolio_app/toronto/services/geometry_service.py +++ b/portfolio_app/toronto/services/geometry_service.py @@ -37,7 +37,7 @@ def get_neighbourhoods_geojson(year: int = 2021) -> dict[str, Any]: ST_AsGeoJSON(geometry)::json as geom, population, livability_score - FROM mart_neighbourhood_overview + FROM public_marts.mart_neighbourhood_overview WHERE year = :year AND geometry IS NOT NULL """ diff --git a/portfolio_app/toronto/services/neighbourhood_service.py b/portfolio_app/toronto/services/neighbourhood_service.py index 73f1221..cefe606 100644 --- a/portfolio_app/toronto/services/neighbourhood_service.py +++ b/portfolio_app/toronto/services/neighbourhood_service.py @@ -1,5 +1,6 @@ """Service layer for querying neighbourhood data from dbt marts.""" +import logging from functools import lru_cache from typing import Any @@ -8,6 +9,8 @@ from sqlalchemy import text from portfolio_app.toronto.models import get_engine +logger = logging.getLogger(__name__) + def _execute_query(sql: str, params: dict[str, Any] | None = None) -> pd.DataFrame: """Execute SQL query and return DataFrame. @@ -23,8 +26,10 @@ def _execute_query(sql: str, params: dict[str, Any] | None = None) -> pd.DataFra engine = get_engine() with engine.connect() as conn: return pd.read_sql(text(sql), conn, params=params) - except Exception: - # Return empty DataFrame on connection or query error + except Exception as e: + logger.error(f"Query failed: {e}") + logger.debug(f"Failed SQL: {sql}") + logger.debug(f"Params: {params}") return pd.DataFrame() @@ -56,7 +61,7 @@ def get_overview_data(year: int = 2021) -> pd.DataFrame: rent_to_income_pct, avg_rent_2bed, total_amenities_per_1000 - FROM mart_neighbourhood_overview + FROM public_marts.mart_neighbourhood_overview WHERE year = :year ORDER BY livability_score DESC NULLS LAST """ @@ -95,7 +100,7 @@ def get_housing_data(year: int = 2021) -> pd.DataFrame: affordability_index, rent_yoy_change_pct, income_quintile - FROM mart_neighbourhood_housing + FROM public_marts.mart_neighbourhood_housing WHERE year = :year ORDER BY affordability_index ASC NULLS LAST """ @@ -112,26 +117,22 @@ def get_safety_data(year: int = 2021) -> pd.DataFrame: Returns: DataFrame with columns: neighbourhood_id, neighbourhood_name, - total_crime_rate, violent_crime_rate, property_crime_rate, etc. + total_crime_rate, violent_crimes, property_crimes, etc. """ sql = """ SELECT neighbourhood_id, neighbourhood_name, year, - total_crimes, + total_incidents as total_crimes, crime_rate_per_100k as total_crime_rate, - violent_crimes, - violent_crime_rate, - property_crimes, - property_crime_rate, - theft_crimes, - theft_rate, - crime_yoy_change_pct, - crime_trend - FROM mart_neighbourhood_safety + assault_count + robbery_count + homicide_count as violent_crimes, + break_enter_count + auto_theft_count as property_crimes, + theft_over_count as theft_crimes, + crime_yoy_change_pct + FROM public_marts.mart_neighbourhood_safety WHERE year = :year - ORDER BY total_crime_rate ASC NULLS LAST + ORDER BY crime_rate_per_100k ASC NULLS LAST """ return _execute_query(sql, {"year": year}) @@ -152,22 +153,22 @@ def get_demographics_data(year: int = 2021) -> pd.DataFrame: SELECT neighbourhood_id, neighbourhood_name, - census_year as year, + year, population, population_density, - population_change_pct, median_household_income, average_household_income, income_quintile, + income_index, median_age, - pct_under_18, - pct_18_to_64, - pct_65_plus, - pct_bachelors_or_higher, + age_index, + pct_owner_occupied, + pct_renter_occupied, + education_bachelors_pct as pct_bachelors_or_higher, unemployment_rate, - diversity_index - FROM mart_neighbourhood_demographics - WHERE census_year = :year + tenure_diversity_index as diversity_index + FROM public_marts.mart_neighbourhood_demographics + WHERE year = :year ORDER BY population DESC NULLS LAST """ return _execute_query(sql, {"year": year}) @@ -183,26 +184,26 @@ def get_amenities_data(year: int = 2021) -> pd.DataFrame: Returns: DataFrame with columns: neighbourhood_id, neighbourhood_name, - amenity_score, parks_per_capita, schools_per_capita, transit_score, etc. + amenity_score, parks_per_1000, schools_per_1000, etc. """ sql = """ SELECT neighbourhood_id, neighbourhood_name, year, - park_count, + parks_count as park_count, parks_per_1000, - school_count, + schools_count as school_count, schools_per_1000, - childcare_count, - childcare_per_1000, + transit_count as childcare_count, + transit_per_1000 as childcare_per_1000, total_amenities, total_amenities_per_1000, - amenity_score, - amenity_rank - FROM mart_neighbourhood_amenities + amenity_index as amenity_score, + amenity_tier as amenity_rank + FROM public_marts.mart_neighbourhood_amenities WHERE year = :year - ORDER BY amenity_score DESC NULLS LAST + ORDER BY amenity_index DESC NULLS LAST """ return _execute_query(sql, {"year": year}) @@ -249,17 +250,17 @@ def get_neighbourhood_details( a.park_count, a.school_count, a.total_amenities - FROM mart_neighbourhood_overview o - LEFT JOIN mart_neighbourhood_safety s + FROM public_marts.mart_neighbourhood_overview o + LEFT JOIN public_marts.mart_neighbourhood_safety s ON o.neighbourhood_id = s.neighbourhood_id AND o.year = s.year - LEFT JOIN mart_neighbourhood_housing h + LEFT JOIN public_marts.mart_neighbourhood_housing h ON o.neighbourhood_id = h.neighbourhood_id AND o.year = h.year - LEFT JOIN mart_neighbourhood_demographics d + LEFT JOIN public_marts.mart_neighbourhood_demographics d ON o.neighbourhood_id = d.neighbourhood_id AND o.year = d.census_year - LEFT JOIN mart_neighbourhood_amenities a + LEFT JOIN public_marts.mart_neighbourhood_amenities a ON o.neighbourhood_id = a.neighbourhood_id AND o.year = a.year WHERE o.neighbourhood_id = :neighbourhood_id @@ -288,7 +289,7 @@ def get_neighbourhood_list(year: int = 2021) -> list[dict[str, Any]]: neighbourhood_id, neighbourhood_name, population - FROM mart_neighbourhood_overview + FROM public_marts.mart_neighbourhood_overview WHERE year = :year ORDER BY neighbourhood_name """ @@ -317,19 +318,19 @@ def get_rankings( """ # Map metrics to their source tables table_map = { - "livability_score": "mart_neighbourhood_overview", - "safety_score": "mart_neighbourhood_overview", - "affordability_score": "mart_neighbourhood_overview", - "amenity_score": "mart_neighbourhood_overview", - "crime_rate_per_100k": "mart_neighbourhood_safety", - "total_crime_rate": "mart_neighbourhood_safety", - "avg_rent_2bed": "mart_neighbourhood_housing", - "affordability_index": "mart_neighbourhood_housing", - "population": "mart_neighbourhood_demographics", - "median_household_income": "mart_neighbourhood_demographics", + "livability_score": "public_marts.mart_neighbourhood_overview", + "safety_score": "public_marts.mart_neighbourhood_overview", + "affordability_score": "public_marts.mart_neighbourhood_overview", + "amenity_score": "public_marts.mart_neighbourhood_overview", + "crime_rate_per_100k": "public_marts.mart_neighbourhood_safety", + "total_crime_rate": "public_marts.mart_neighbourhood_safety", + "avg_rent_2bed": "public_marts.mart_neighbourhood_housing", + "affordability_index": "public_marts.mart_neighbourhood_housing", + "population": "public_marts.mart_neighbourhood_demographics", + "median_household_income": "public_marts.mart_neighbourhood_demographics", } - table = table_map.get(metric, "mart_neighbourhood_overview") + table = table_map.get(metric, "public_marts.mart_neighbourhood_overview") year_col = "census_year" if "demographics" in table else "year" order = "ASC" if ascending else "DESC" @@ -375,7 +376,7 @@ def get_city_averages(year: int = 2021) -> dict[str, Any]: AVG(crime_rate_per_100k) as avg_crime_rate, AVG(avg_rent_2bed) as avg_rent_2bed, AVG(rent_to_income_pct) as avg_rent_to_income - FROM mart_neighbourhood_overview + FROM public_marts.mart_neighbourhood_overview WHERE year = :year """ df = _execute_query(sql, {"year": year}) diff --git a/scripts/data/load_toronto_data.py b/scripts/data/load_toronto_data.py index 63d5353..03a22bc 100644 --- a/scripts/data/load_toronto_data.py +++ b/scripts/data/load_toronto_data.py @@ -38,12 +38,16 @@ from portfolio_app.toronto.loaders import ( # noqa: E402 load_census_data, load_crime_data, load_neighbourhoods, + load_statcan_cmhc_data, load_time_dimension, ) from portfolio_app.toronto.parsers import ( # noqa: E402 TorontoOpenDataParser, TorontoPoliceParser, ) +from portfolio_app.toronto.parsers.statcan_cmhc import ( # noqa: E402 + fetch_toronto_rental_data, +) from portfolio_app.toronto.schemas import Neighbourhood # noqa: E402 # Configure logging @@ -91,6 +95,9 @@ class DataPipeline: # 5. Load amenities self._load_amenities(session) + # 6. Load CMHC rental data from StatCan + self._load_rentals(session) + session.commit() logger.info("All data committed to database") @@ -241,6 +248,32 @@ class DataPipeline: self.stats["amenities"] = total_count + def _load_rentals(self, session: Any) -> None: + """Fetch and load CMHC rental data from StatCan.""" + logger.info("Fetching CMHC rental data from Statistics Canada...") + + if self.dry_run: + logger.info(" [DRY RUN] Would fetch and load CMHC rental data") + return + + try: + # Fetch rental data (2014-present) + rental_records = fetch_toronto_rental_data(start_year=2014) + + if not rental_records: + logger.warning(" No rental records fetched") + return + + count = load_statcan_cmhc_data(rental_records, session) + self.stats["rentals"] = count + logger.info(f" Loaded {count} CMHC rental records") + except Exception as e: + logger.warning(f" Failed to load CMHC rental data: {e}") + if self.verbose: + import traceback + + traceback.print_exc() + def run_dbt(self) -> bool: """Run dbt to transform data. diff --git a/scripts/db/init_schema.py b/scripts/db/init_schema.py index 6066e04..19ac89d 100644 --- a/scripts/db/init_schema.py +++ b/scripts/db/init_schema.py @@ -25,8 +25,10 @@ def main() -> int: engine = get_engine() # Test connection + from sqlalchemy import text + with engine.connect() as conn: - result = conn.execute("SELECT 1") + result = conn.execute(text("SELECT 1")) result.fetchone() print("Database connection successful")