Files
personal-portfolio/portfolio_app/toronto/loaders/cmhc.py
lmiranda 457bb49395 feat: add loaders and dbt models for Toronto housing data
Sprint 4 implementation:

Loaders:
- base.py: Session management, bulk insert, upsert utilities
- dimensions.py: Load time, district, zone, neighbourhood, policy dimensions
- trreb.py: Load TRREB purchase data to fact_purchases
- cmhc.py: Load CMHC rental data to fact_rentals

dbt Project:
- Project configuration (dbt_project.yml, packages.yml)
- Staging models for all fact and dimension tables
- Intermediate models with dimension enrichment
- Marts: purchase analysis, rental analysis, market summary

Closes #16

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-11 16:07:30 -05:00

138 lines
4.3 KiB
Python

"""Loader for CMHC rental data into fact_rentals."""
from sqlalchemy.orm import Session
from portfolio_app.toronto.models import DimCMHCZone, DimTime, FactRentals
from portfolio_app.toronto.schemas import CMHCAnnualSurvey, CMHCRentalRecord
from .base import get_session, upsert_by_key
from .dimensions import generate_date_key
def load_cmhc_rentals(
survey: CMHCAnnualSurvey,
session: Session | None = None,
) -> int:
"""Load CMHC annual survey data into fact_rentals.
Args:
survey: Validated CMHC annual survey containing records.
session: Optional existing session.
Returns:
Number of records loaded.
"""
from datetime import date
def _load(sess: Session) -> int:
# Get zone key mapping
zones = sess.query(DimCMHCZone).all()
zone_map = {z.zone_code: z.zone_key for z in zones}
# CMHC surveys are annual - use October 1st as reference date
survey_date = date(survey.survey_year, 10, 1)
date_key = generate_date_key(survey_date)
# Verify time dimension exists
time_dim = sess.query(DimTime).filter_by(date_key=date_key).first()
if not time_dim:
raise ValueError(
f"Time dimension not found for date_key {date_key}. "
"Load time dimension first."
)
records = []
for record in survey.records:
zone_key = zone_map.get(record.zone_code)
if not zone_key:
# Skip records for unknown zones
continue
fact = FactRentals(
date_key=date_key,
zone_key=zone_key,
bedroom_type=record.bedroom_type.value,
universe=record.universe,
avg_rent=record.average_rent,
median_rent=record.median_rent,
vacancy_rate=record.vacancy_rate,
availability_rate=record.availability_rate,
turnover_rate=record.turnover_rate,
rent_change_pct=record.rent_change_pct,
reliability_code=record.average_rent_reliability.value
if record.average_rent_reliability
else None,
)
records.append(fact)
inserted, updated = upsert_by_key(
sess, FactRentals, records, ["date_key", "zone_key", "bedroom_type"]
)
return inserted + updated
if session:
return _load(session)
with get_session() as sess:
return _load(sess)
def load_cmhc_record(
record: CMHCRentalRecord,
survey_year: int,
session: Session | None = None,
) -> int:
"""Load a single CMHC record into fact_rentals.
Args:
record: Single validated CMHC rental record.
survey_year: Year of the survey.
session: Optional existing session.
Returns:
Number of records loaded (0 or 1).
"""
from datetime import date
def _load(sess: Session) -> int:
# Get zone key
zone = sess.query(DimCMHCZone).filter_by(zone_code=record.zone_code).first()
if not zone:
return 0
survey_date = date(survey_year, 10, 1)
date_key = generate_date_key(survey_date)
# Verify time dimension exists
time_dim = sess.query(DimTime).filter_by(date_key=date_key).first()
if not time_dim:
raise ValueError(
f"Time dimension not found for date_key {date_key}. "
"Load time dimension first."
)
fact = FactRentals(
date_key=date_key,
zone_key=zone.zone_key,
bedroom_type=record.bedroom_type.value,
universe=record.universe,
avg_rent=record.average_rent,
median_rent=record.median_rent,
vacancy_rate=record.vacancy_rate,
availability_rate=record.availability_rate,
turnover_rate=record.turnover_rate,
rent_change_pct=record.rent_change_pct,
reliability_code=record.average_rent_reliability.value
if record.average_rent_reliability
else None,
)
inserted, updated = upsert_by_key(
sess, FactRentals, [fact], ["date_key", "zone_key", "bedroom_type"]
)
return inserted + updated
if session:
return _load(session)
with get_session() as sess:
return _load(sess)