Sprint 4 implementation: Loaders: - base.py: Session management, bulk insert, upsert utilities - dimensions.py: Load time, district, zone, neighbourhood, policy dimensions - trreb.py: Load TRREB purchase data to fact_purchases - cmhc.py: Load CMHC rental data to fact_rentals dbt Project: - Project configuration (dbt_project.yml, packages.yml) - Staging models for all fact and dimension tables - Intermediate models with dimension enrichment - Marts: purchase analysis, rental analysis, market summary Closes #16 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
130 lines
3.9 KiB
Python
130 lines
3.9 KiB
Python
"""Loader for TRREB purchase data into fact_purchases."""
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from portfolio_app.toronto.models import DimTime, DimTRREBDistrict, FactPurchases
|
|
from portfolio_app.toronto.schemas import TRREBMonthlyRecord, TRREBMonthlyReport
|
|
|
|
from .base import get_session, upsert_by_key
|
|
from .dimensions import generate_date_key
|
|
|
|
|
|
def load_trreb_purchases(
|
|
report: TRREBMonthlyReport,
|
|
session: Session | None = None,
|
|
) -> int:
|
|
"""Load TRREB monthly report data into fact_purchases.
|
|
|
|
Args:
|
|
report: Validated TRREB monthly report containing records.
|
|
session: Optional existing session.
|
|
|
|
Returns:
|
|
Number of records loaded.
|
|
"""
|
|
|
|
def _load(sess: Session) -> int:
|
|
# Get district key mapping
|
|
districts = sess.query(DimTRREBDistrict).all()
|
|
district_map = {d.district_code: d.district_key for d in districts}
|
|
|
|
# Build date key from report date
|
|
date_key = generate_date_key(report.report_date)
|
|
|
|
# Verify time dimension exists
|
|
time_dim = sess.query(DimTime).filter_by(date_key=date_key).first()
|
|
if not time_dim:
|
|
raise ValueError(
|
|
f"Time dimension not found for date_key {date_key}. "
|
|
"Load time dimension first."
|
|
)
|
|
|
|
records = []
|
|
for record in report.records:
|
|
district_key = district_map.get(record.area_code)
|
|
if not district_key:
|
|
# Skip records for unknown districts (e.g., aggregate rows)
|
|
continue
|
|
|
|
fact = FactPurchases(
|
|
date_key=date_key,
|
|
district_key=district_key,
|
|
sales_count=record.sales,
|
|
dollar_volume=record.dollar_volume,
|
|
avg_price=record.avg_price,
|
|
median_price=record.median_price,
|
|
new_listings=record.new_listings,
|
|
active_listings=record.active_listings,
|
|
avg_dom=record.avg_dom,
|
|
avg_sp_lp=record.avg_sp_lp,
|
|
)
|
|
records.append(fact)
|
|
|
|
inserted, updated = upsert_by_key(
|
|
sess, FactPurchases, records, ["date_key", "district_key"]
|
|
)
|
|
return inserted + updated
|
|
|
|
if session:
|
|
return _load(session)
|
|
with get_session() as sess:
|
|
return _load(sess)
|
|
|
|
|
|
def load_trreb_record(
|
|
record: TRREBMonthlyRecord,
|
|
session: Session | None = None,
|
|
) -> int:
|
|
"""Load a single TRREB record into fact_purchases.
|
|
|
|
Args:
|
|
record: Single validated TRREB monthly record.
|
|
session: Optional existing session.
|
|
|
|
Returns:
|
|
Number of records loaded (0 or 1).
|
|
"""
|
|
|
|
def _load(sess: Session) -> int:
|
|
# Get district key
|
|
district = (
|
|
sess.query(DimTRREBDistrict)
|
|
.filter_by(district_code=record.area_code)
|
|
.first()
|
|
)
|
|
if not district:
|
|
return 0
|
|
|
|
date_key = generate_date_key(record.report_date)
|
|
|
|
# Verify time dimension exists
|
|
time_dim = sess.query(DimTime).filter_by(date_key=date_key).first()
|
|
if not time_dim:
|
|
raise ValueError(
|
|
f"Time dimension not found for date_key {date_key}. "
|
|
"Load time dimension first."
|
|
)
|
|
|
|
fact = FactPurchases(
|
|
date_key=date_key,
|
|
district_key=district.district_key,
|
|
sales_count=record.sales,
|
|
dollar_volume=record.dollar_volume,
|
|
avg_price=record.avg_price,
|
|
median_price=record.median_price,
|
|
new_listings=record.new_listings,
|
|
active_listings=record.active_listings,
|
|
avg_dom=record.avg_dom,
|
|
avg_sp_lp=record.avg_sp_lp,
|
|
)
|
|
|
|
inserted, updated = upsert_by_key(
|
|
sess, FactPurchases, [fact], ["date_key", "district_key"]
|
|
)
|
|
return inserted + updated
|
|
|
|
if session:
|
|
return _load(session)
|
|
with get_session() as sess:
|
|
return _load(sess)
|