Files
lmiranda d0f32edba7 fix: Repair data pipeline with StatCan CMHC rental data
- Add StatCan CMHC parser to fetch rental data from Statistics Canada API
- Create year spine (2014-2025) as time dimension driver instead of census
- Add CMA-level rental and income intermediate models
- Update mart_neighbourhood_overview to use rental years as base
- Fix neighbourhood_service queries to match dbt schema
- Add CMHC data loading to pipeline script

Data now flows correctly: 158 neighbourhoods × 12 years = 1,896 records
Rent data available 2019-2025, crime data 2014-2024

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-17 15:38:31 -05:00

262 lines
8.1 KiB
Python

"""Loader for CMHC rental data into fact_rentals."""
import logging
from datetime import date
from typing import Any
from sqlalchemy.orm import Session
from portfolio_app.toronto.models import DimCMHCZone, DimTime, FactRentals
from portfolio_app.toronto.schemas import CMHCAnnualSurvey, CMHCRentalRecord
from .base import get_session, upsert_by_key
from .dimensions import generate_date_key
logger = logging.getLogger(__name__)
# Toronto CMA zone code for CMA-level data
TORONTO_CMA_ZONE_CODE = "TORCMA"
TORONTO_CMA_ZONE_NAME = "Toronto CMA"
def load_cmhc_rentals(
survey: CMHCAnnualSurvey,
session: Session | None = None,
) -> int:
"""Load CMHC annual survey data into fact_rentals.
Args:
survey: Validated CMHC annual survey containing records.
session: Optional existing session.
Returns:
Number of records loaded.
"""
from datetime import date
def _load(sess: Session) -> int:
# Get zone key mapping
zones = sess.query(DimCMHCZone).all()
zone_map = {z.zone_code: z.zone_key for z in zones}
# CMHC surveys are annual - use October 1st as reference date
survey_date = date(survey.survey_year, 10, 1)
date_key = generate_date_key(survey_date)
# Verify time dimension exists
time_dim = sess.query(DimTime).filter_by(date_key=date_key).first()
if not time_dim:
raise ValueError(
f"Time dimension not found for date_key {date_key}. "
"Load time dimension first."
)
records = []
for record in survey.records:
zone_key = zone_map.get(record.zone_code)
if not zone_key:
# Skip records for unknown zones
continue
fact = FactRentals(
date_key=date_key,
zone_key=zone_key,
bedroom_type=record.bedroom_type.value,
universe=record.universe,
avg_rent=record.average_rent,
median_rent=record.median_rent,
vacancy_rate=record.vacancy_rate,
availability_rate=record.availability_rate,
turnover_rate=record.turnover_rate,
rent_change_pct=record.rent_change_pct,
reliability_code=record.average_rent_reliability.value
if record.average_rent_reliability
else None,
)
records.append(fact)
inserted, updated = upsert_by_key(
sess, FactRentals, records, ["date_key", "zone_key", "bedroom_type"]
)
return inserted + updated
if session:
return _load(session)
with get_session() as sess:
return _load(sess)
def load_cmhc_record(
record: CMHCRentalRecord,
survey_year: int,
session: Session | None = None,
) -> int:
"""Load a single CMHC record into fact_rentals.
Args:
record: Single validated CMHC rental record.
survey_year: Year of the survey.
session: Optional existing session.
Returns:
Number of records loaded (0 or 1).
"""
from datetime import date
def _load(sess: Session) -> int:
# Get zone key
zone = sess.query(DimCMHCZone).filter_by(zone_code=record.zone_code).first()
if not zone:
return 0
survey_date = date(survey_year, 10, 1)
date_key = generate_date_key(survey_date)
# Verify time dimension exists
time_dim = sess.query(DimTime).filter_by(date_key=date_key).first()
if not time_dim:
raise ValueError(
f"Time dimension not found for date_key {date_key}. "
"Load time dimension first."
)
fact = FactRentals(
date_key=date_key,
zone_key=zone.zone_key,
bedroom_type=record.bedroom_type.value,
universe=record.universe,
avg_rent=record.average_rent,
median_rent=record.median_rent,
vacancy_rate=record.vacancy_rate,
availability_rate=record.availability_rate,
turnover_rate=record.turnover_rate,
rent_change_pct=record.rent_change_pct,
reliability_code=record.average_rent_reliability.value
if record.average_rent_reliability
else None,
)
inserted, updated = upsert_by_key(
sess, FactRentals, [fact], ["date_key", "zone_key", "bedroom_type"]
)
return inserted + updated
if session:
return _load(session)
with get_session() as sess:
return _load(sess)
def ensure_toronto_cma_zone(session: Session | None = None) -> int:
"""Ensure Toronto CMA zone exists in dim_cmhc_zone.
Creates the zone if it doesn't exist.
Args:
session: Optional existing session.
Returns:
The zone_key for Toronto CMA.
"""
def _ensure(sess: Session) -> int:
zone = (
sess.query(DimCMHCZone).filter_by(zone_code=TORONTO_CMA_ZONE_CODE).first()
)
if zone:
return int(zone.zone_key)
# Create new zone
new_zone = DimCMHCZone(
zone_code=TORONTO_CMA_ZONE_CODE,
zone_name=TORONTO_CMA_ZONE_NAME,
geometry=None, # CMA-level doesn't need geometry
)
sess.add(new_zone)
sess.flush()
logger.info(f"Created Toronto CMA zone with zone_key={new_zone.zone_key}")
return int(new_zone.zone_key)
if session:
return _ensure(session)
with get_session() as sess:
result = _ensure(sess)
sess.commit()
return result
def load_statcan_cmhc_data(
records: list[Any], # List of CMHCRentalRecord from statcan_cmhc parser
session: Session | None = None,
) -> int:
"""Load CMHC rental data from StatCan parser into fact_rentals.
This function handles CMA-level data from the StatCan API, which provides
aggregate Toronto data rather than zone-level HMIP data.
Args:
records: List of CMHCRentalRecord dataclass instances from statcan_cmhc parser.
session: Optional existing session.
Returns:
Number of records loaded.
"""
from portfolio_app.toronto.parsers.statcan_cmhc import (
CMHCRentalRecord as StatCanRecord,
)
def _load(sess: Session) -> int:
# Ensure Toronto CMA zone exists
zone_key = ensure_toronto_cma_zone(sess)
loaded = 0
for record in records:
if not isinstance(record, StatCanRecord):
logger.warning(f"Skipping invalid record type: {type(record)}")
continue
# Generate date key for this record's survey date
survey_date = date(record.year, record.month, 1)
date_key = generate_date_key(survey_date)
# Verify time dimension exists
time_dim = sess.query(DimTime).filter_by(date_key=date_key).first()
if not time_dim:
logger.warning(
f"Time dimension not found for {survey_date}, skipping record"
)
continue
# Create fact record
fact = FactRentals(
date_key=date_key,
zone_key=zone_key,
bedroom_type=record.bedroom_type,
universe=record.universe,
avg_rent=float(record.avg_rent) if record.avg_rent else None,
median_rent=None, # StatCan doesn't provide median
vacancy_rate=float(record.vacancy_rate)
if record.vacancy_rate
else None,
availability_rate=None,
turnover_rate=None,
rent_change_pct=None,
reliability_code=None,
)
# Upsert
inserted, updated = upsert_by_key(
sess, FactRentals, [fact], ["date_key", "zone_key", "bedroom_type"]
)
loaded += inserted + updated
logger.info(f"Loaded {loaded} CMHC rental records from StatCan")
return loaded
if session:
return _load(session)
with get_session() as sess:
result = _load(sess)
sess.commit()
return result