feat: add loaders and dbt models for Toronto housing data

Sprint 4 implementation:

Loaders:
- base.py: Session management, bulk insert, upsert utilities
- dimensions.py: Load time, district, zone, neighbourhood, policy dimensions
- trreb.py: Load TRREB purchase data to fact_purchases
- cmhc.py: Load CMHC rental data to fact_rentals

dbt Project:
- Project configuration (dbt_project.yml, packages.yml)
- Staging models for all fact and dimension tables
- Intermediate models with dimension enrichment
- Marts: purchase analysis, rental analysis, market summary

Closes #16

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-11 16:07:30 -05:00
parent 88e23674a8
commit 457bb49395
22 changed files with 1320 additions and 0 deletions

View File

@@ -1 +1,32 @@
"""Database loaders for Toronto housing data."""
from .base import bulk_insert, get_session, upsert_by_key
from .cmhc import load_cmhc_record, load_cmhc_rentals
from .dimensions import (
generate_date_key,
load_cmhc_zones,
load_neighbourhoods,
load_policy_events,
load_time_dimension,
load_trreb_districts,
)
from .trreb import load_trreb_purchases, load_trreb_record
__all__ = [
# Base utilities
"get_session",
"bulk_insert",
"upsert_by_key",
# Dimension loaders
"generate_date_key",
"load_time_dimension",
"load_trreb_districts",
"load_cmhc_zones",
"load_neighbourhoods",
"load_policy_events",
# Fact loaders
"load_trreb_purchases",
"load_trreb_record",
"load_cmhc_rentals",
"load_cmhc_record",
]

View File

@@ -0,0 +1,85 @@
"""Base loader utilities for database operations."""
from collections.abc import Generator
from contextlib import contextmanager
from typing import Any, TypeVar
from sqlalchemy.orm import Session
from portfolio_app.toronto.models import get_session_factory
T = TypeVar("T")
@contextmanager
def get_session() -> Generator[Session, None, None]:
"""Get a database session with automatic cleanup.
Yields:
SQLAlchemy session that auto-commits on success, rollbacks on error.
"""
session_factory = get_session_factory()
session = session_factory()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
def bulk_insert(session: Session, objects: list[T]) -> int:
"""Bulk insert objects into the database.
Args:
session: Active SQLAlchemy session.
objects: List of ORM model instances to insert.
Returns:
Number of objects inserted.
"""
session.add_all(objects)
session.flush()
return len(objects)
def upsert_by_key(
session: Session,
model_class: Any,
objects: list[T],
key_columns: list[str],
) -> tuple[int, int]:
"""Upsert objects based on unique key columns.
Args:
session: Active SQLAlchemy session.
model_class: The ORM model class.
objects: List of ORM model instances to upsert.
key_columns: Column names that form the unique key.
Returns:
Tuple of (inserted_count, updated_count).
"""
inserted = 0
updated = 0
for obj in objects:
# Build filter for existing record
filters = {col: getattr(obj, col) for col in key_columns}
existing = session.query(model_class).filter_by(**filters).first()
if existing:
# Update existing record
for column in model_class.__table__.columns:
if column.name not in key_columns and column.name != "id":
setattr(existing, column.name, getattr(obj, column.name))
updated += 1
else:
# Insert new record
session.add(obj)
inserted += 1
session.flush()
return inserted, updated

View File

@@ -0,0 +1,137 @@
"""Loader for CMHC rental data into fact_rentals."""
from sqlalchemy.orm import Session
from portfolio_app.toronto.models import DimCMHCZone, DimTime, FactRentals
from portfolio_app.toronto.schemas import CMHCAnnualSurvey, CMHCRentalRecord
from .base import get_session, upsert_by_key
from .dimensions import generate_date_key
def load_cmhc_rentals(
survey: CMHCAnnualSurvey,
session: Session | None = None,
) -> int:
"""Load CMHC annual survey data into fact_rentals.
Args:
survey: Validated CMHC annual survey containing records.
session: Optional existing session.
Returns:
Number of records loaded.
"""
from datetime import date
def _load(sess: Session) -> int:
# Get zone key mapping
zones = sess.query(DimCMHCZone).all()
zone_map = {z.zone_code: z.zone_key for z in zones}
# CMHC surveys are annual - use October 1st as reference date
survey_date = date(survey.survey_year, 10, 1)
date_key = generate_date_key(survey_date)
# Verify time dimension exists
time_dim = sess.query(DimTime).filter_by(date_key=date_key).first()
if not time_dim:
raise ValueError(
f"Time dimension not found for date_key {date_key}. "
"Load time dimension first."
)
records = []
for record in survey.records:
zone_key = zone_map.get(record.zone_code)
if not zone_key:
# Skip records for unknown zones
continue
fact = FactRentals(
date_key=date_key,
zone_key=zone_key,
bedroom_type=record.bedroom_type.value,
universe=record.universe,
avg_rent=record.average_rent,
median_rent=record.median_rent,
vacancy_rate=record.vacancy_rate,
availability_rate=record.availability_rate,
turnover_rate=record.turnover_rate,
rent_change_pct=record.rent_change_pct,
reliability_code=record.average_rent_reliability.value
if record.average_rent_reliability
else None,
)
records.append(fact)
inserted, updated = upsert_by_key(
sess, FactRentals, records, ["date_key", "zone_key", "bedroom_type"]
)
return inserted + updated
if session:
return _load(session)
with get_session() as sess:
return _load(sess)
def load_cmhc_record(
record: CMHCRentalRecord,
survey_year: int,
session: Session | None = None,
) -> int:
"""Load a single CMHC record into fact_rentals.
Args:
record: Single validated CMHC rental record.
survey_year: Year of the survey.
session: Optional existing session.
Returns:
Number of records loaded (0 or 1).
"""
from datetime import date
def _load(sess: Session) -> int:
# Get zone key
zone = sess.query(DimCMHCZone).filter_by(zone_code=record.zone_code).first()
if not zone:
return 0
survey_date = date(survey_year, 10, 1)
date_key = generate_date_key(survey_date)
# Verify time dimension exists
time_dim = sess.query(DimTime).filter_by(date_key=date_key).first()
if not time_dim:
raise ValueError(
f"Time dimension not found for date_key {date_key}. "
"Load time dimension first."
)
fact = FactRentals(
date_key=date_key,
zone_key=zone.zone_key,
bedroom_type=record.bedroom_type.value,
universe=record.universe,
avg_rent=record.average_rent,
median_rent=record.median_rent,
vacancy_rate=record.vacancy_rate,
availability_rate=record.availability_rate,
turnover_rate=record.turnover_rate,
rent_change_pct=record.rent_change_pct,
reliability_code=record.average_rent_reliability.value
if record.average_rent_reliability
else None,
)
inserted, updated = upsert_by_key(
sess, FactRentals, [fact], ["date_key", "zone_key", "bedroom_type"]
)
return inserted + updated
if session:
return _load(session)
with get_session() as sess:
return _load(sess)

View File

@@ -0,0 +1,251 @@
"""Loaders for dimension tables."""
from datetime import date
from sqlalchemy.orm import Session
from portfolio_app.toronto.models import (
DimCMHCZone,
DimNeighbourhood,
DimPolicyEvent,
DimTime,
DimTRREBDistrict,
)
from portfolio_app.toronto.schemas import (
CMHCZone,
Neighbourhood,
PolicyEvent,
TRREBDistrict,
)
from .base import get_session, upsert_by_key
def generate_date_key(d: date) -> int:
"""Generate integer date key from date (YYYYMMDD format).
Args:
d: Date to convert.
Returns:
Integer in YYYYMMDD format.
"""
return d.year * 10000 + d.month * 100 + d.day
def load_time_dimension(
start_date: date,
end_date: date,
session: Session | None = None,
) -> int:
"""Load time dimension with date range.
Args:
start_date: Start of date range.
end_date: End of date range (inclusive).
session: Optional existing session.
Returns:
Number of records loaded.
"""
month_names = [
"",
"January",
"February",
"March",
"April",
"May",
"June",
"July",
"August",
"September",
"October",
"November",
"December",
]
def _load(sess: Session) -> int:
records = []
current = start_date.replace(day=1) # Start at month beginning
while current <= end_date:
quarter = (current.month - 1) // 3 + 1
dim = DimTime(
date_key=generate_date_key(current),
full_date=current,
year=current.year,
month=current.month,
quarter=quarter,
month_name=month_names[current.month],
is_month_start=True,
)
records.append(dim)
# Move to next month
if current.month == 12:
current = current.replace(year=current.year + 1, month=1)
else:
current = current.replace(month=current.month + 1)
inserted, updated = upsert_by_key(sess, DimTime, records, ["date_key"])
return inserted + updated
if session:
return _load(session)
with get_session() as sess:
return _load(sess)
def load_trreb_districts(
districts: list[TRREBDistrict],
session: Session | None = None,
) -> int:
"""Load TRREB district dimension.
Args:
districts: List of validated district schemas.
session: Optional existing session.
Returns:
Number of records loaded.
"""
def _load(sess: Session) -> int:
records = []
for d in districts:
dim = DimTRREBDistrict(
district_code=d.district_code,
district_name=d.district_name,
area_type=d.area_type.value,
geometry=d.geometry_wkt,
)
records.append(dim)
inserted, updated = upsert_by_key(
sess, DimTRREBDistrict, records, ["district_code"]
)
return inserted + updated
if session:
return _load(session)
with get_session() as sess:
return _load(sess)
def load_cmhc_zones(
zones: list[CMHCZone],
session: Session | None = None,
) -> int:
"""Load CMHC zone dimension.
Args:
zones: List of validated zone schemas.
session: Optional existing session.
Returns:
Number of records loaded.
"""
def _load(sess: Session) -> int:
records = []
for z in zones:
dim = DimCMHCZone(
zone_code=z.zone_code,
zone_name=z.zone_name,
geometry=z.geometry_wkt,
)
records.append(dim)
inserted, updated = upsert_by_key(sess, DimCMHCZone, records, ["zone_code"])
return inserted + updated
if session:
return _load(session)
with get_session() as sess:
return _load(sess)
def load_neighbourhoods(
neighbourhoods: list[Neighbourhood],
session: Session | None = None,
) -> int:
"""Load neighbourhood dimension.
Args:
neighbourhoods: List of validated neighbourhood schemas.
session: Optional existing session.
Returns:
Number of records loaded.
"""
def _load(sess: Session) -> int:
records = []
for n in neighbourhoods:
dim = DimNeighbourhood(
neighbourhood_id=n.neighbourhood_id,
name=n.name,
geometry=n.geometry_wkt,
population=n.population,
land_area_sqkm=n.land_area_sqkm,
pop_density_per_sqkm=n.pop_density_per_sqkm,
pct_bachelors_or_higher=n.pct_bachelors_or_higher,
median_household_income=n.median_household_income,
pct_owner_occupied=n.pct_owner_occupied,
pct_renter_occupied=n.pct_renter_occupied,
census_year=n.census_year,
)
records.append(dim)
inserted, updated = upsert_by_key(
sess, DimNeighbourhood, records, ["neighbourhood_id"]
)
return inserted + updated
if session:
return _load(session)
with get_session() as sess:
return _load(sess)
def load_policy_events(
events: list[PolicyEvent],
session: Session | None = None,
) -> int:
"""Load policy event dimension.
Args:
events: List of validated policy event schemas.
session: Optional existing session.
Returns:
Number of records loaded.
"""
def _load(sess: Session) -> int:
records = []
for e in events:
dim = DimPolicyEvent(
event_date=e.event_date,
effective_date=e.effective_date,
level=e.level.value,
category=e.category.value,
title=e.title,
description=e.description,
expected_direction=e.expected_direction.value,
source_url=e.source_url,
confidence=e.confidence.value,
)
records.append(dim)
# For policy events, use event_date + title as unique key
inserted, updated = upsert_by_key(
sess, DimPolicyEvent, records, ["event_date", "title"]
)
return inserted + updated
if session:
return _load(session)
with get_session() as sess:
return _load(sess)

View File

@@ -0,0 +1,129 @@
"""Loader for TRREB purchase data into fact_purchases."""
from sqlalchemy.orm import Session
from portfolio_app.toronto.models import DimTime, DimTRREBDistrict, FactPurchases
from portfolio_app.toronto.schemas import TRREBMonthlyRecord, TRREBMonthlyReport
from .base import get_session, upsert_by_key
from .dimensions import generate_date_key
def load_trreb_purchases(
report: TRREBMonthlyReport,
session: Session | None = None,
) -> int:
"""Load TRREB monthly report data into fact_purchases.
Args:
report: Validated TRREB monthly report containing records.
session: Optional existing session.
Returns:
Number of records loaded.
"""
def _load(sess: Session) -> int:
# Get district key mapping
districts = sess.query(DimTRREBDistrict).all()
district_map = {d.district_code: d.district_key for d in districts}
# Build date key from report date
date_key = generate_date_key(report.report_date)
# Verify time dimension exists
time_dim = sess.query(DimTime).filter_by(date_key=date_key).first()
if not time_dim:
raise ValueError(
f"Time dimension not found for date_key {date_key}. "
"Load time dimension first."
)
records = []
for record in report.records:
district_key = district_map.get(record.area_code)
if not district_key:
# Skip records for unknown districts (e.g., aggregate rows)
continue
fact = FactPurchases(
date_key=date_key,
district_key=district_key,
sales_count=record.sales,
dollar_volume=record.dollar_volume,
avg_price=record.avg_price,
median_price=record.median_price,
new_listings=record.new_listings,
active_listings=record.active_listings,
avg_dom=record.avg_dom,
avg_sp_lp=record.avg_sp_lp,
)
records.append(fact)
inserted, updated = upsert_by_key(
sess, FactPurchases, records, ["date_key", "district_key"]
)
return inserted + updated
if session:
return _load(session)
with get_session() as sess:
return _load(sess)
def load_trreb_record(
record: TRREBMonthlyRecord,
session: Session | None = None,
) -> int:
"""Load a single TRREB record into fact_purchases.
Args:
record: Single validated TRREB monthly record.
session: Optional existing session.
Returns:
Number of records loaded (0 or 1).
"""
def _load(sess: Session) -> int:
# Get district key
district = (
sess.query(DimTRREBDistrict)
.filter_by(district_code=record.area_code)
.first()
)
if not district:
return 0
date_key = generate_date_key(record.report_date)
# Verify time dimension exists
time_dim = sess.query(DimTime).filter_by(date_key=date_key).first()
if not time_dim:
raise ValueError(
f"Time dimension not found for date_key {date_key}. "
"Load time dimension first."
)
fact = FactPurchases(
date_key=date_key,
district_key=district.district_key,
sales_count=record.sales,
dollar_volume=record.dollar_volume,
avg_price=record.avg_price,
median_price=record.median_price,
new_listings=record.new_listings,
active_listings=record.active_listings,
avg_dom=record.avg_dom,
avg_sp_lp=record.avg_sp_lp,
)
inserted, updated = upsert_by_key(
sess, FactPurchases, [fact], ["date_key", "district_key"]
)
return inserted + updated
if session:
return _load(session)
with get_session() as sess:
return _load(sess)