- Add StatCan CMHC parser to fetch rental data from Statistics Canada API - Create year spine (2014-2025) as time dimension driver instead of census - Add CMA-level rental and income intermediate models - Update mart_neighbourhood_overview to use rental years as base - Fix neighbourhood_service queries to match dbt schema - Add CMHC data loading to pipeline script Data now flows correctly: 158 neighbourhoods × 12 years = 1,896 records Rent data available 2019-2025, crime data 2014-2024 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
262 lines
8.1 KiB
Python
262 lines
8.1 KiB
Python
"""Loader for CMHC rental data into fact_rentals."""
|
|
|
|
import logging
|
|
from datetime import date
|
|
from typing import Any
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from portfolio_app.toronto.models import DimCMHCZone, DimTime, FactRentals
|
|
from portfolio_app.toronto.schemas import CMHCAnnualSurvey, CMHCRentalRecord
|
|
|
|
from .base import get_session, upsert_by_key
|
|
from .dimensions import generate_date_key
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Toronto CMA zone code for CMA-level data
|
|
TORONTO_CMA_ZONE_CODE = "TORCMA"
|
|
TORONTO_CMA_ZONE_NAME = "Toronto CMA"
|
|
|
|
|
|
def load_cmhc_rentals(
|
|
survey: CMHCAnnualSurvey,
|
|
session: Session | None = None,
|
|
) -> int:
|
|
"""Load CMHC annual survey data into fact_rentals.
|
|
|
|
Args:
|
|
survey: Validated CMHC annual survey containing records.
|
|
session: Optional existing session.
|
|
|
|
Returns:
|
|
Number of records loaded.
|
|
"""
|
|
from datetime import date
|
|
|
|
def _load(sess: Session) -> int:
|
|
# Get zone key mapping
|
|
zones = sess.query(DimCMHCZone).all()
|
|
zone_map = {z.zone_code: z.zone_key for z in zones}
|
|
|
|
# CMHC surveys are annual - use October 1st as reference date
|
|
survey_date = date(survey.survey_year, 10, 1)
|
|
date_key = generate_date_key(survey_date)
|
|
|
|
# Verify time dimension exists
|
|
time_dim = sess.query(DimTime).filter_by(date_key=date_key).first()
|
|
if not time_dim:
|
|
raise ValueError(
|
|
f"Time dimension not found for date_key {date_key}. "
|
|
"Load time dimension first."
|
|
)
|
|
|
|
records = []
|
|
for record in survey.records:
|
|
zone_key = zone_map.get(record.zone_code)
|
|
if not zone_key:
|
|
# Skip records for unknown zones
|
|
continue
|
|
|
|
fact = FactRentals(
|
|
date_key=date_key,
|
|
zone_key=zone_key,
|
|
bedroom_type=record.bedroom_type.value,
|
|
universe=record.universe,
|
|
avg_rent=record.average_rent,
|
|
median_rent=record.median_rent,
|
|
vacancy_rate=record.vacancy_rate,
|
|
availability_rate=record.availability_rate,
|
|
turnover_rate=record.turnover_rate,
|
|
rent_change_pct=record.rent_change_pct,
|
|
reliability_code=record.average_rent_reliability.value
|
|
if record.average_rent_reliability
|
|
else None,
|
|
)
|
|
records.append(fact)
|
|
|
|
inserted, updated = upsert_by_key(
|
|
sess, FactRentals, records, ["date_key", "zone_key", "bedroom_type"]
|
|
)
|
|
return inserted + updated
|
|
|
|
if session:
|
|
return _load(session)
|
|
with get_session() as sess:
|
|
return _load(sess)
|
|
|
|
|
|
def load_cmhc_record(
|
|
record: CMHCRentalRecord,
|
|
survey_year: int,
|
|
session: Session | None = None,
|
|
) -> int:
|
|
"""Load a single CMHC record into fact_rentals.
|
|
|
|
Args:
|
|
record: Single validated CMHC rental record.
|
|
survey_year: Year of the survey.
|
|
session: Optional existing session.
|
|
|
|
Returns:
|
|
Number of records loaded (0 or 1).
|
|
"""
|
|
from datetime import date
|
|
|
|
def _load(sess: Session) -> int:
|
|
# Get zone key
|
|
zone = sess.query(DimCMHCZone).filter_by(zone_code=record.zone_code).first()
|
|
if not zone:
|
|
return 0
|
|
|
|
survey_date = date(survey_year, 10, 1)
|
|
date_key = generate_date_key(survey_date)
|
|
|
|
# Verify time dimension exists
|
|
time_dim = sess.query(DimTime).filter_by(date_key=date_key).first()
|
|
if not time_dim:
|
|
raise ValueError(
|
|
f"Time dimension not found for date_key {date_key}. "
|
|
"Load time dimension first."
|
|
)
|
|
|
|
fact = FactRentals(
|
|
date_key=date_key,
|
|
zone_key=zone.zone_key,
|
|
bedroom_type=record.bedroom_type.value,
|
|
universe=record.universe,
|
|
avg_rent=record.average_rent,
|
|
median_rent=record.median_rent,
|
|
vacancy_rate=record.vacancy_rate,
|
|
availability_rate=record.availability_rate,
|
|
turnover_rate=record.turnover_rate,
|
|
rent_change_pct=record.rent_change_pct,
|
|
reliability_code=record.average_rent_reliability.value
|
|
if record.average_rent_reliability
|
|
else None,
|
|
)
|
|
|
|
inserted, updated = upsert_by_key(
|
|
sess, FactRentals, [fact], ["date_key", "zone_key", "bedroom_type"]
|
|
)
|
|
return inserted + updated
|
|
|
|
if session:
|
|
return _load(session)
|
|
with get_session() as sess:
|
|
return _load(sess)
|
|
|
|
|
|
def ensure_toronto_cma_zone(session: Session | None = None) -> int:
|
|
"""Ensure Toronto CMA zone exists in dim_cmhc_zone.
|
|
|
|
Creates the zone if it doesn't exist.
|
|
|
|
Args:
|
|
session: Optional existing session.
|
|
|
|
Returns:
|
|
The zone_key for Toronto CMA.
|
|
"""
|
|
|
|
def _ensure(sess: Session) -> int:
|
|
zone = (
|
|
sess.query(DimCMHCZone).filter_by(zone_code=TORONTO_CMA_ZONE_CODE).first()
|
|
)
|
|
if zone:
|
|
return int(zone.zone_key)
|
|
|
|
# Create new zone
|
|
new_zone = DimCMHCZone(
|
|
zone_code=TORONTO_CMA_ZONE_CODE,
|
|
zone_name=TORONTO_CMA_ZONE_NAME,
|
|
geometry=None, # CMA-level doesn't need geometry
|
|
)
|
|
sess.add(new_zone)
|
|
sess.flush()
|
|
logger.info(f"Created Toronto CMA zone with zone_key={new_zone.zone_key}")
|
|
return int(new_zone.zone_key)
|
|
|
|
if session:
|
|
return _ensure(session)
|
|
with get_session() as sess:
|
|
result = _ensure(sess)
|
|
sess.commit()
|
|
return result
|
|
|
|
|
|
def load_statcan_cmhc_data(
|
|
records: list[Any], # List of CMHCRentalRecord from statcan_cmhc parser
|
|
session: Session | None = None,
|
|
) -> int:
|
|
"""Load CMHC rental data from StatCan parser into fact_rentals.
|
|
|
|
This function handles CMA-level data from the StatCan API, which provides
|
|
aggregate Toronto data rather than zone-level HMIP data.
|
|
|
|
Args:
|
|
records: List of CMHCRentalRecord dataclass instances from statcan_cmhc parser.
|
|
session: Optional existing session.
|
|
|
|
Returns:
|
|
Number of records loaded.
|
|
"""
|
|
from portfolio_app.toronto.parsers.statcan_cmhc import (
|
|
CMHCRentalRecord as StatCanRecord,
|
|
)
|
|
|
|
def _load(sess: Session) -> int:
|
|
# Ensure Toronto CMA zone exists
|
|
zone_key = ensure_toronto_cma_zone(sess)
|
|
|
|
loaded = 0
|
|
for record in records:
|
|
if not isinstance(record, StatCanRecord):
|
|
logger.warning(f"Skipping invalid record type: {type(record)}")
|
|
continue
|
|
|
|
# Generate date key for this record's survey date
|
|
survey_date = date(record.year, record.month, 1)
|
|
date_key = generate_date_key(survey_date)
|
|
|
|
# Verify time dimension exists
|
|
time_dim = sess.query(DimTime).filter_by(date_key=date_key).first()
|
|
if not time_dim:
|
|
logger.warning(
|
|
f"Time dimension not found for {survey_date}, skipping record"
|
|
)
|
|
continue
|
|
|
|
# Create fact record
|
|
fact = FactRentals(
|
|
date_key=date_key,
|
|
zone_key=zone_key,
|
|
bedroom_type=record.bedroom_type,
|
|
universe=record.universe,
|
|
avg_rent=float(record.avg_rent) if record.avg_rent else None,
|
|
median_rent=None, # StatCan doesn't provide median
|
|
vacancy_rate=float(record.vacancy_rate)
|
|
if record.vacancy_rate
|
|
else None,
|
|
availability_rate=None,
|
|
turnover_rate=None,
|
|
rent_change_pct=None,
|
|
reliability_code=None,
|
|
)
|
|
|
|
# Upsert
|
|
inserted, updated = upsert_by_key(
|
|
sess, FactRentals, [fact], ["date_key", "zone_key", "bedroom_type"]
|
|
)
|
|
loaded += inserted + updated
|
|
|
|
logger.info(f"Loaded {loaded} CMHC rental records from StatCan")
|
|
return loaded
|
|
|
|
if session:
|
|
return _load(session)
|
|
with get_session() as sess:
|
|
result = _load(sess)
|
|
sess.commit()
|
|
return result
|