Sprint 4 implementation: Loaders: - base.py: Session management, bulk insert, upsert utilities - dimensions.py: Load time, district, zone, neighbourhood, policy dimensions - trreb.py: Load TRREB purchase data to fact_purchases - cmhc.py: Load CMHC rental data to fact_rentals dbt Project: - Project configuration (dbt_project.yml, packages.yml) - Staging models for all fact and dimension tables - Intermediate models with dimension enrichment - Marts: purchase analysis, rental analysis, market summary Closes #16 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
138 lines
4.3 KiB
Python
138 lines
4.3 KiB
Python
"""Loader for CMHC rental data into fact_rentals."""
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from portfolio_app.toronto.models import DimCMHCZone, DimTime, FactRentals
|
|
from portfolio_app.toronto.schemas import CMHCAnnualSurvey, CMHCRentalRecord
|
|
|
|
from .base import get_session, upsert_by_key
|
|
from .dimensions import generate_date_key
|
|
|
|
|
|
def load_cmhc_rentals(
|
|
survey: CMHCAnnualSurvey,
|
|
session: Session | None = None,
|
|
) -> int:
|
|
"""Load CMHC annual survey data into fact_rentals.
|
|
|
|
Args:
|
|
survey: Validated CMHC annual survey containing records.
|
|
session: Optional existing session.
|
|
|
|
Returns:
|
|
Number of records loaded.
|
|
"""
|
|
from datetime import date
|
|
|
|
def _load(sess: Session) -> int:
|
|
# Get zone key mapping
|
|
zones = sess.query(DimCMHCZone).all()
|
|
zone_map = {z.zone_code: z.zone_key for z in zones}
|
|
|
|
# CMHC surveys are annual - use October 1st as reference date
|
|
survey_date = date(survey.survey_year, 10, 1)
|
|
date_key = generate_date_key(survey_date)
|
|
|
|
# Verify time dimension exists
|
|
time_dim = sess.query(DimTime).filter_by(date_key=date_key).first()
|
|
if not time_dim:
|
|
raise ValueError(
|
|
f"Time dimension not found for date_key {date_key}. "
|
|
"Load time dimension first."
|
|
)
|
|
|
|
records = []
|
|
for record in survey.records:
|
|
zone_key = zone_map.get(record.zone_code)
|
|
if not zone_key:
|
|
# Skip records for unknown zones
|
|
continue
|
|
|
|
fact = FactRentals(
|
|
date_key=date_key,
|
|
zone_key=zone_key,
|
|
bedroom_type=record.bedroom_type.value,
|
|
universe=record.universe,
|
|
avg_rent=record.average_rent,
|
|
median_rent=record.median_rent,
|
|
vacancy_rate=record.vacancy_rate,
|
|
availability_rate=record.availability_rate,
|
|
turnover_rate=record.turnover_rate,
|
|
rent_change_pct=record.rent_change_pct,
|
|
reliability_code=record.average_rent_reliability.value
|
|
if record.average_rent_reliability
|
|
else None,
|
|
)
|
|
records.append(fact)
|
|
|
|
inserted, updated = upsert_by_key(
|
|
sess, FactRentals, records, ["date_key", "zone_key", "bedroom_type"]
|
|
)
|
|
return inserted + updated
|
|
|
|
if session:
|
|
return _load(session)
|
|
with get_session() as sess:
|
|
return _load(sess)
|
|
|
|
|
|
def load_cmhc_record(
|
|
record: CMHCRentalRecord,
|
|
survey_year: int,
|
|
session: Session | None = None,
|
|
) -> int:
|
|
"""Load a single CMHC record into fact_rentals.
|
|
|
|
Args:
|
|
record: Single validated CMHC rental record.
|
|
survey_year: Year of the survey.
|
|
session: Optional existing session.
|
|
|
|
Returns:
|
|
Number of records loaded (0 or 1).
|
|
"""
|
|
from datetime import date
|
|
|
|
def _load(sess: Session) -> int:
|
|
# Get zone key
|
|
zone = sess.query(DimCMHCZone).filter_by(zone_code=record.zone_code).first()
|
|
if not zone:
|
|
return 0
|
|
|
|
survey_date = date(survey_year, 10, 1)
|
|
date_key = generate_date_key(survey_date)
|
|
|
|
# Verify time dimension exists
|
|
time_dim = sess.query(DimTime).filter_by(date_key=date_key).first()
|
|
if not time_dim:
|
|
raise ValueError(
|
|
f"Time dimension not found for date_key {date_key}. "
|
|
"Load time dimension first."
|
|
)
|
|
|
|
fact = FactRentals(
|
|
date_key=date_key,
|
|
zone_key=zone.zone_key,
|
|
bedroom_type=record.bedroom_type.value,
|
|
universe=record.universe,
|
|
avg_rent=record.average_rent,
|
|
median_rent=record.median_rent,
|
|
vacancy_rate=record.vacancy_rate,
|
|
availability_rate=record.availability_rate,
|
|
turnover_rate=record.turnover_rate,
|
|
rent_change_pct=record.rent_change_pct,
|
|
reliability_code=record.average_rent_reliability.value
|
|
if record.average_rent_reliability
|
|
else None,
|
|
)
|
|
|
|
inserted, updated = upsert_by_key(
|
|
sess, FactRentals, [fact], ["date_key", "zone_key", "bedroom_type"]
|
|
)
|
|
return inserted + updated
|
|
|
|
if session:
|
|
return _load(session)
|
|
with get_session() as sess:
|
|
return _load(sess)
|