2 Commits

Author SHA1 Message Date
b7907e68e4 Merge pull request 'feat: Sprint 4 - Loaders and dbt models' (#17) from feature/sprint4-loaders-dbt into development 2026-01-11 21:08:01 +00:00
457bb49395 feat: add loaders and dbt models for Toronto housing data
Sprint 4 implementation:

Loaders:
- base.py: Session management, bulk insert, upsert utilities
- dimensions.py: Load time, district, zone, neighbourhood, policy dimensions
- trreb.py: Load TRREB purchase data to fact_purchases
- cmhc.py: Load CMHC rental data to fact_rentals

dbt Project:
- Project configuration (dbt_project.yml, packages.yml)
- Staging models for all fact and dimension tables
- Intermediate models with dimension enrichment
- Marts: purchase analysis, rental analysis, market summary

Closes #16

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-11 16:07:30 -05:00
22 changed files with 1320 additions and 0 deletions

28
dbt/dbt_project.yml Normal file
View File

@@ -0,0 +1,28 @@
name: 'toronto_housing'
version: '1.0.0'
config-version: 2
profile: 'toronto_housing'
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
clean-targets:
- "target"
- "dbt_packages"
models:
toronto_housing:
staging:
+materialized: view
+schema: staging
intermediate:
+materialized: view
+schema: intermediate
marts:
+materialized: table
+schema: marts

View File

@@ -0,0 +1,24 @@
version: 2
models:
- name: int_purchases__monthly
description: "Purchase data enriched with time and district dimensions"
columns:
- name: purchase_id
tests:
- unique
- not_null
- name: district_code
tests:
- not_null
- name: int_rentals__annual
description: "Rental data enriched with time and zone dimensions"
columns:
- name: rental_id
tests:
- unique
- not_null
- name: zone_code
tests:
- not_null

View File

@@ -0,0 +1,62 @@
-- Intermediate: Monthly purchase data enriched with dimensions
-- Joins purchases with time and district dimensions for analysis
with purchases as (
select * from {{ ref('stg_trreb__purchases') }}
),
time_dim as (
select * from {{ ref('stg_dimensions__time') }}
),
district_dim as (
select * from {{ ref('stg_dimensions__trreb_districts') }}
),
enriched as (
select
p.purchase_id,
-- Time attributes
t.date_key,
t.full_date,
t.year,
t.month,
t.quarter,
t.month_name,
-- District attributes
d.district_key,
d.district_code,
d.district_name,
d.area_type,
-- Metrics
p.sales_count,
p.dollar_volume,
p.avg_price,
p.median_price,
p.new_listings,
p.active_listings,
p.days_on_market,
p.sale_to_list_ratio,
-- Calculated metrics
case
when p.active_listings > 0
then round(p.sales_count::numeric / p.active_listings, 3)
else null
end as absorption_rate,
case
when p.sales_count > 0
then round(p.active_listings::numeric / p.sales_count, 1)
else null
end as months_of_inventory
from purchases p
inner join time_dim t on p.date_key = t.date_key
inner join district_dim d on p.district_key = d.district_key
)
select * from enriched

View File

@@ -0,0 +1,57 @@
-- Intermediate: Annual rental data enriched with dimensions
-- Joins rentals with time and zone dimensions for analysis
with rentals as (
select * from {{ ref('stg_cmhc__rentals') }}
),
time_dim as (
select * from {{ ref('stg_dimensions__time') }}
),
zone_dim as (
select * from {{ ref('stg_dimensions__cmhc_zones') }}
),
enriched as (
select
r.rental_id,
-- Time attributes
t.date_key,
t.full_date,
t.year,
t.month,
t.quarter,
-- Zone attributes
z.zone_key,
z.zone_code,
z.zone_name,
-- Bedroom type
r.bedroom_type,
-- Metrics
r.rental_universe,
r.avg_rent,
r.median_rent,
r.vacancy_rate,
r.availability_rate,
r.turnover_rate,
r.year_over_year_rent_change,
r.reliability_code,
-- Calculated metrics
case
when r.rental_universe > 0 and r.vacancy_rate is not null
then round(r.rental_universe * (r.vacancy_rate / 100), 0)
else null
end as vacant_units_estimate
from rentals r
inner join time_dim t on r.date_key = t.date_key
inner join zone_dim z on r.zone_key = z.zone_key
)
select * from enriched

View File

@@ -0,0 +1,23 @@
version: 2
models:
- name: mart_toronto_purchases
description: "Final mart for Toronto purchase/sales analysis by district and time"
columns:
- name: purchase_id
description: "Unique purchase record identifier"
tests:
- unique
- not_null
- name: mart_toronto_rentals
description: "Final mart for Toronto rental market analysis by zone and time"
columns:
- name: rental_id
description: "Unique rental record identifier"
tests:
- unique
- not_null
- name: mart_toronto_market_summary
description: "Combined market summary aggregating purchases and rentals at Toronto level"

View File

@@ -0,0 +1,81 @@
-- Mart: Toronto Market Summary
-- Aggregated view combining purchase and rental market indicators
-- Grain: One row per year-month
with purchases_agg as (
select
year,
month,
month_name,
quarter,
-- Aggregate purchase metrics across all districts
sum(sales_count) as total_sales,
sum(dollar_volume) as total_dollar_volume,
round(avg(avg_price), 0) as avg_price_all_districts,
round(avg(median_price), 0) as median_price_all_districts,
sum(new_listings) as total_new_listings,
sum(active_listings) as total_active_listings,
round(avg(days_on_market), 0) as avg_days_on_market,
round(avg(sale_to_list_ratio), 2) as avg_sale_to_list_ratio,
round(avg(absorption_rate), 3) as avg_absorption_rate,
round(avg(months_of_inventory), 1) as avg_months_of_inventory,
round(avg(avg_price_yoy_pct), 2) as avg_price_yoy_pct
from {{ ref('mart_toronto_purchases') }}
group by year, month, month_name, quarter
),
rentals_agg as (
select
year,
-- Aggregate rental metrics across all zones (all bedroom types)
round(avg(avg_rent), 0) as avg_rent_all_zones,
round(avg(vacancy_rate), 2) as avg_vacancy_rate,
round(avg(rent_change_pct), 2) as avg_rent_change_pct,
sum(rental_universe) as total_rental_universe
from {{ ref('mart_toronto_rentals') }}
group by year
),
final as (
select
p.year,
p.month,
p.month_name,
p.quarter,
-- Purchase market indicators
p.total_sales,
p.total_dollar_volume,
p.avg_price_all_districts,
p.median_price_all_districts,
p.total_new_listings,
p.total_active_listings,
p.avg_days_on_market,
p.avg_sale_to_list_ratio,
p.avg_absorption_rate,
p.avg_months_of_inventory,
p.avg_price_yoy_pct,
-- Rental market indicators (annual, so join on year)
r.avg_rent_all_zones,
r.avg_vacancy_rate,
r.avg_rent_change_pct,
r.total_rental_universe,
-- Affordability indicator (price to rent ratio)
case
when r.avg_rent_all_zones > 0
then round(p.avg_price_all_districts / (r.avg_rent_all_zones * 12), 1)
else null
end as price_to_annual_rent_ratio
from purchases_agg p
left join rentals_agg r on p.year = r.year
)
select * from final
order by year desc, month desc

View File

@@ -0,0 +1,79 @@
-- Mart: Toronto Purchase Market Analysis
-- Final analytical table for purchase/sales data visualization
-- Grain: One row per district per month
with purchases as (
select * from {{ ref('int_purchases__monthly') }}
),
-- Add year-over-year calculations
with_yoy as (
select
p.*,
-- Previous year same month values
lag(p.avg_price, 12) over (
partition by p.district_code
order by p.date_key
) as avg_price_prev_year,
lag(p.sales_count, 12) over (
partition by p.district_code
order by p.date_key
) as sales_count_prev_year,
lag(p.median_price, 12) over (
partition by p.district_code
order by p.date_key
) as median_price_prev_year
from purchases p
),
final as (
select
purchase_id,
date_key,
full_date,
year,
month,
quarter,
month_name,
district_key,
district_code,
district_name,
area_type,
sales_count,
dollar_volume,
avg_price,
median_price,
new_listings,
active_listings,
days_on_market,
sale_to_list_ratio,
absorption_rate,
months_of_inventory,
-- Year-over-year changes
case
when avg_price_prev_year > 0
then round(((avg_price - avg_price_prev_year) / avg_price_prev_year) * 100, 2)
else null
end as avg_price_yoy_pct,
case
when sales_count_prev_year > 0
then round(((sales_count - sales_count_prev_year)::numeric / sales_count_prev_year) * 100, 2)
else null
end as sales_count_yoy_pct,
case
when median_price_prev_year > 0
then round(((median_price - median_price_prev_year) / median_price_prev_year) * 100, 2)
else null
end as median_price_yoy_pct
from with_yoy
)
select * from final

View File

@@ -0,0 +1,64 @@
-- Mart: Toronto Rental Market Analysis
-- Final analytical table for rental market visualization
-- Grain: One row per zone per bedroom type per survey year
with rentals as (
select * from {{ ref('int_rentals__annual') }}
),
-- Add year-over-year calculations
with_yoy as (
select
r.*,
-- Previous year values
lag(r.avg_rent, 1) over (
partition by r.zone_code, r.bedroom_type
order by r.year
) as avg_rent_prev_year,
lag(r.vacancy_rate, 1) over (
partition by r.zone_code, r.bedroom_type
order by r.year
) as vacancy_rate_prev_year
from rentals r
),
final as (
select
rental_id,
date_key,
full_date,
year,
quarter,
zone_key,
zone_code,
zone_name,
bedroom_type,
rental_universe,
avg_rent,
median_rent,
vacancy_rate,
availability_rate,
turnover_rate,
year_over_year_rent_change,
reliability_code,
vacant_units_estimate,
-- Calculated year-over-year (if not provided)
coalesce(
year_over_year_rent_change,
case
when avg_rent_prev_year > 0
then round(((avg_rent - avg_rent_prev_year) / avg_rent_prev_year) * 100, 2)
else null
end
) as rent_change_pct,
vacancy_rate - vacancy_rate_prev_year as vacancy_rate_change
from with_yoy
)
select * from final

View File

@@ -0,0 +1,61 @@
version: 2
sources:
- name: toronto_housing
description: "Toronto housing data loaded from TRREB and CMHC sources"
database: portfolio
schema: public
tables:
- name: fact_purchases
description: "TRREB monthly purchase/sales statistics by district"
columns:
- name: id
description: "Primary key"
- name: date_key
description: "Foreign key to dim_time"
- name: district_key
description: "Foreign key to dim_trreb_district"
- name: fact_rentals
description: "CMHC annual rental survey data by zone and bedroom type"
columns:
- name: id
description: "Primary key"
- name: date_key
description: "Foreign key to dim_time"
- name: zone_key
description: "Foreign key to dim_cmhc_zone"
- name: dim_time
description: "Time dimension (monthly grain)"
columns:
- name: date_key
description: "Primary key (YYYYMMDD format)"
- name: dim_trreb_district
description: "TRREB district dimension with geometry"
columns:
- name: district_key
description: "Primary key"
- name: district_code
description: "TRREB district code"
- name: dim_cmhc_zone
description: "CMHC zone dimension with geometry"
columns:
- name: zone_key
description: "Primary key"
- name: zone_code
description: "CMHC zone code"
- name: dim_neighbourhood
description: "City of Toronto neighbourhoods (reference only)"
columns:
- name: neighbourhood_id
description: "Primary key"
- name: dim_policy_event
description: "Housing policy events for annotation"
columns:
- name: event_id
description: "Primary key"

View File

@@ -0,0 +1,73 @@
version: 2
models:
- name: stg_trreb__purchases
description: "Staged TRREB purchase/sales data from fact_purchases"
columns:
- name: purchase_id
description: "Unique identifier for purchase record"
tests:
- unique
- not_null
- name: date_key
description: "Date dimension key (YYYYMMDD)"
tests:
- not_null
- name: district_key
description: "TRREB district dimension key"
tests:
- not_null
- name: stg_cmhc__rentals
description: "Staged CMHC rental market data from fact_rentals"
columns:
- name: rental_id
description: "Unique identifier for rental record"
tests:
- unique
- not_null
- name: date_key
description: "Date dimension key (YYYYMMDD)"
tests:
- not_null
- name: zone_key
description: "CMHC zone dimension key"
tests:
- not_null
- name: stg_dimensions__time
description: "Staged time dimension"
columns:
- name: date_key
description: "Date dimension key (YYYYMMDD)"
tests:
- unique
- not_null
- name: stg_dimensions__trreb_districts
description: "Staged TRREB district dimension"
columns:
- name: district_key
description: "District dimension key"
tests:
- unique
- not_null
- name: district_code
description: "TRREB district code (e.g., W01, C01)"
tests:
- unique
- not_null
- name: stg_dimensions__cmhc_zones
description: "Staged CMHC zone dimension"
columns:
- name: zone_key
description: "Zone dimension key"
tests:
- unique
- not_null
- name: zone_code
description: "CMHC zone code"
tests:
- unique
- not_null

View File

@@ -0,0 +1,26 @@
-- Staged CMHC rental market survey data
-- Source: fact_rentals table loaded from CMHC CSV exports
-- Grain: One row per zone per bedroom type per survey year
with source as (
select * from {{ source('toronto_housing', 'fact_rentals') }}
),
staged as (
select
id as rental_id,
date_key,
zone_key,
bedroom_type,
universe as rental_universe,
avg_rent,
median_rent,
vacancy_rate,
availability_rate,
turnover_rate,
rent_change_pct as year_over_year_rent_change,
reliability_code
from source
)
select * from staged

View File

@@ -0,0 +1,18 @@
-- Staged CMHC zone dimension
-- Source: dim_cmhc_zone table
-- Grain: One row per zone
with source as (
select * from {{ source('toronto_housing', 'dim_cmhc_zone') }}
),
staged as (
select
zone_key,
zone_code,
zone_name,
geometry
from source
)
select * from staged

View File

@@ -0,0 +1,21 @@
-- Staged time dimension
-- Source: dim_time table
-- Grain: One row per month
with source as (
select * from {{ source('toronto_housing', 'dim_time') }}
),
staged as (
select
date_key,
full_date,
year,
month,
quarter,
month_name,
is_month_start
from source
)
select * from staged

View File

@@ -0,0 +1,19 @@
-- Staged TRREB district dimension
-- Source: dim_trreb_district table
-- Grain: One row per district
with source as (
select * from {{ source('toronto_housing', 'dim_trreb_district') }}
),
staged as (
select
district_key,
district_code,
district_name,
area_type,
geometry
from source
)
select * from staged

View File

@@ -0,0 +1,25 @@
-- Staged TRREB purchase/sales data
-- Source: fact_purchases table loaded from TRREB Market Watch PDFs
-- Grain: One row per district per month
with source as (
select * from {{ source('toronto_housing', 'fact_purchases') }}
),
staged as (
select
id as purchase_id,
date_key,
district_key,
sales_count,
dollar_volume,
avg_price,
median_price,
new_listings,
active_listings,
avg_dom as days_on_market,
avg_sp_lp as sale_to_list_ratio
from source
)
select * from staged

5
dbt/packages.yml Normal file
View File

@@ -0,0 +1,5 @@
packages:
- package: dbt-labs/dbt_utils
version: ">=1.0.0"
- package: calogica/dbt_expectations
version: ">=0.10.0"

21
dbt/profiles.yml.example Normal file
View File

@@ -0,0 +1,21 @@
toronto_housing:
target: dev
outputs:
dev:
type: postgres
host: localhost
user: portfolio
password: "{{ env_var('POSTGRES_PASSWORD') }}"
port: 5432
dbname: portfolio
schema: public
threads: 4
prod:
type: postgres
host: "{{ env_var('POSTGRES_HOST') }}"
user: "{{ env_var('POSTGRES_USER') }}"
password: "{{ env_var('POSTGRES_PASSWORD') }}"
port: 5432
dbname: portfolio
schema: public
threads: 4

View File

@@ -1 +1,32 @@
"""Database loaders for Toronto housing data.""" """Database loaders for Toronto housing data."""
from .base import bulk_insert, get_session, upsert_by_key
from .cmhc import load_cmhc_record, load_cmhc_rentals
from .dimensions import (
generate_date_key,
load_cmhc_zones,
load_neighbourhoods,
load_policy_events,
load_time_dimension,
load_trreb_districts,
)
from .trreb import load_trreb_purchases, load_trreb_record
__all__ = [
# Base utilities
"get_session",
"bulk_insert",
"upsert_by_key",
# Dimension loaders
"generate_date_key",
"load_time_dimension",
"load_trreb_districts",
"load_cmhc_zones",
"load_neighbourhoods",
"load_policy_events",
# Fact loaders
"load_trreb_purchases",
"load_trreb_record",
"load_cmhc_rentals",
"load_cmhc_record",
]

View File

@@ -0,0 +1,85 @@
"""Base loader utilities for database operations."""
from collections.abc import Generator
from contextlib import contextmanager
from typing import Any, TypeVar
from sqlalchemy.orm import Session
from portfolio_app.toronto.models import get_session_factory
T = TypeVar("T")
@contextmanager
def get_session() -> Generator[Session, None, None]:
"""Get a database session with automatic cleanup.
Yields:
SQLAlchemy session that auto-commits on success, rollbacks on error.
"""
session_factory = get_session_factory()
session = session_factory()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
def bulk_insert(session: Session, objects: list[T]) -> int:
"""Bulk insert objects into the database.
Args:
session: Active SQLAlchemy session.
objects: List of ORM model instances to insert.
Returns:
Number of objects inserted.
"""
session.add_all(objects)
session.flush()
return len(objects)
def upsert_by_key(
session: Session,
model_class: Any,
objects: list[T],
key_columns: list[str],
) -> tuple[int, int]:
"""Upsert objects based on unique key columns.
Args:
session: Active SQLAlchemy session.
model_class: The ORM model class.
objects: List of ORM model instances to upsert.
key_columns: Column names that form the unique key.
Returns:
Tuple of (inserted_count, updated_count).
"""
inserted = 0
updated = 0
for obj in objects:
# Build filter for existing record
filters = {col: getattr(obj, col) for col in key_columns}
existing = session.query(model_class).filter_by(**filters).first()
if existing:
# Update existing record
for column in model_class.__table__.columns:
if column.name not in key_columns and column.name != "id":
setattr(existing, column.name, getattr(obj, column.name))
updated += 1
else:
# Insert new record
session.add(obj)
inserted += 1
session.flush()
return inserted, updated

View File

@@ -0,0 +1,137 @@
"""Loader for CMHC rental data into fact_rentals."""
from sqlalchemy.orm import Session
from portfolio_app.toronto.models import DimCMHCZone, DimTime, FactRentals
from portfolio_app.toronto.schemas import CMHCAnnualSurvey, CMHCRentalRecord
from .base import get_session, upsert_by_key
from .dimensions import generate_date_key
def load_cmhc_rentals(
survey: CMHCAnnualSurvey,
session: Session | None = None,
) -> int:
"""Load CMHC annual survey data into fact_rentals.
Args:
survey: Validated CMHC annual survey containing records.
session: Optional existing session.
Returns:
Number of records loaded.
"""
from datetime import date
def _load(sess: Session) -> int:
# Get zone key mapping
zones = sess.query(DimCMHCZone).all()
zone_map = {z.zone_code: z.zone_key for z in zones}
# CMHC surveys are annual - use October 1st as reference date
survey_date = date(survey.survey_year, 10, 1)
date_key = generate_date_key(survey_date)
# Verify time dimension exists
time_dim = sess.query(DimTime).filter_by(date_key=date_key).first()
if not time_dim:
raise ValueError(
f"Time dimension not found for date_key {date_key}. "
"Load time dimension first."
)
records = []
for record in survey.records:
zone_key = zone_map.get(record.zone_code)
if not zone_key:
# Skip records for unknown zones
continue
fact = FactRentals(
date_key=date_key,
zone_key=zone_key,
bedroom_type=record.bedroom_type.value,
universe=record.universe,
avg_rent=record.average_rent,
median_rent=record.median_rent,
vacancy_rate=record.vacancy_rate,
availability_rate=record.availability_rate,
turnover_rate=record.turnover_rate,
rent_change_pct=record.rent_change_pct,
reliability_code=record.average_rent_reliability.value
if record.average_rent_reliability
else None,
)
records.append(fact)
inserted, updated = upsert_by_key(
sess, FactRentals, records, ["date_key", "zone_key", "bedroom_type"]
)
return inserted + updated
if session:
return _load(session)
with get_session() as sess:
return _load(sess)
def load_cmhc_record(
record: CMHCRentalRecord,
survey_year: int,
session: Session | None = None,
) -> int:
"""Load a single CMHC record into fact_rentals.
Args:
record: Single validated CMHC rental record.
survey_year: Year of the survey.
session: Optional existing session.
Returns:
Number of records loaded (0 or 1).
"""
from datetime import date
def _load(sess: Session) -> int:
# Get zone key
zone = sess.query(DimCMHCZone).filter_by(zone_code=record.zone_code).first()
if not zone:
return 0
survey_date = date(survey_year, 10, 1)
date_key = generate_date_key(survey_date)
# Verify time dimension exists
time_dim = sess.query(DimTime).filter_by(date_key=date_key).first()
if not time_dim:
raise ValueError(
f"Time dimension not found for date_key {date_key}. "
"Load time dimension first."
)
fact = FactRentals(
date_key=date_key,
zone_key=zone.zone_key,
bedroom_type=record.bedroom_type.value,
universe=record.universe,
avg_rent=record.average_rent,
median_rent=record.median_rent,
vacancy_rate=record.vacancy_rate,
availability_rate=record.availability_rate,
turnover_rate=record.turnover_rate,
rent_change_pct=record.rent_change_pct,
reliability_code=record.average_rent_reliability.value
if record.average_rent_reliability
else None,
)
inserted, updated = upsert_by_key(
sess, FactRentals, [fact], ["date_key", "zone_key", "bedroom_type"]
)
return inserted + updated
if session:
return _load(session)
with get_session() as sess:
return _load(sess)

View File

@@ -0,0 +1,251 @@
"""Loaders for dimension tables."""
from datetime import date
from sqlalchemy.orm import Session
from portfolio_app.toronto.models import (
DimCMHCZone,
DimNeighbourhood,
DimPolicyEvent,
DimTime,
DimTRREBDistrict,
)
from portfolio_app.toronto.schemas import (
CMHCZone,
Neighbourhood,
PolicyEvent,
TRREBDistrict,
)
from .base import get_session, upsert_by_key
def generate_date_key(d: date) -> int:
"""Generate integer date key from date (YYYYMMDD format).
Args:
d: Date to convert.
Returns:
Integer in YYYYMMDD format.
"""
return d.year * 10000 + d.month * 100 + d.day
def load_time_dimension(
start_date: date,
end_date: date,
session: Session | None = None,
) -> int:
"""Load time dimension with date range.
Args:
start_date: Start of date range.
end_date: End of date range (inclusive).
session: Optional existing session.
Returns:
Number of records loaded.
"""
month_names = [
"",
"January",
"February",
"March",
"April",
"May",
"June",
"July",
"August",
"September",
"October",
"November",
"December",
]
def _load(sess: Session) -> int:
records = []
current = start_date.replace(day=1) # Start at month beginning
while current <= end_date:
quarter = (current.month - 1) // 3 + 1
dim = DimTime(
date_key=generate_date_key(current),
full_date=current,
year=current.year,
month=current.month,
quarter=quarter,
month_name=month_names[current.month],
is_month_start=True,
)
records.append(dim)
# Move to next month
if current.month == 12:
current = current.replace(year=current.year + 1, month=1)
else:
current = current.replace(month=current.month + 1)
inserted, updated = upsert_by_key(sess, DimTime, records, ["date_key"])
return inserted + updated
if session:
return _load(session)
with get_session() as sess:
return _load(sess)
def load_trreb_districts(
districts: list[TRREBDistrict],
session: Session | None = None,
) -> int:
"""Load TRREB district dimension.
Args:
districts: List of validated district schemas.
session: Optional existing session.
Returns:
Number of records loaded.
"""
def _load(sess: Session) -> int:
records = []
for d in districts:
dim = DimTRREBDistrict(
district_code=d.district_code,
district_name=d.district_name,
area_type=d.area_type.value,
geometry=d.geometry_wkt,
)
records.append(dim)
inserted, updated = upsert_by_key(
sess, DimTRREBDistrict, records, ["district_code"]
)
return inserted + updated
if session:
return _load(session)
with get_session() as sess:
return _load(sess)
def load_cmhc_zones(
zones: list[CMHCZone],
session: Session | None = None,
) -> int:
"""Load CMHC zone dimension.
Args:
zones: List of validated zone schemas.
session: Optional existing session.
Returns:
Number of records loaded.
"""
def _load(sess: Session) -> int:
records = []
for z in zones:
dim = DimCMHCZone(
zone_code=z.zone_code,
zone_name=z.zone_name,
geometry=z.geometry_wkt,
)
records.append(dim)
inserted, updated = upsert_by_key(sess, DimCMHCZone, records, ["zone_code"])
return inserted + updated
if session:
return _load(session)
with get_session() as sess:
return _load(sess)
def load_neighbourhoods(
neighbourhoods: list[Neighbourhood],
session: Session | None = None,
) -> int:
"""Load neighbourhood dimension.
Args:
neighbourhoods: List of validated neighbourhood schemas.
session: Optional existing session.
Returns:
Number of records loaded.
"""
def _load(sess: Session) -> int:
records = []
for n in neighbourhoods:
dim = DimNeighbourhood(
neighbourhood_id=n.neighbourhood_id,
name=n.name,
geometry=n.geometry_wkt,
population=n.population,
land_area_sqkm=n.land_area_sqkm,
pop_density_per_sqkm=n.pop_density_per_sqkm,
pct_bachelors_or_higher=n.pct_bachelors_or_higher,
median_household_income=n.median_household_income,
pct_owner_occupied=n.pct_owner_occupied,
pct_renter_occupied=n.pct_renter_occupied,
census_year=n.census_year,
)
records.append(dim)
inserted, updated = upsert_by_key(
sess, DimNeighbourhood, records, ["neighbourhood_id"]
)
return inserted + updated
if session:
return _load(session)
with get_session() as sess:
return _load(sess)
def load_policy_events(
events: list[PolicyEvent],
session: Session | None = None,
) -> int:
"""Load policy event dimension.
Args:
events: List of validated policy event schemas.
session: Optional existing session.
Returns:
Number of records loaded.
"""
def _load(sess: Session) -> int:
records = []
for e in events:
dim = DimPolicyEvent(
event_date=e.event_date,
effective_date=e.effective_date,
level=e.level.value,
category=e.category.value,
title=e.title,
description=e.description,
expected_direction=e.expected_direction.value,
source_url=e.source_url,
confidence=e.confidence.value,
)
records.append(dim)
# For policy events, use event_date + title as unique key
inserted, updated = upsert_by_key(
sess, DimPolicyEvent, records, ["event_date", "title"]
)
return inserted + updated
if session:
return _load(session)
with get_session() as sess:
return _load(sess)

View File

@@ -0,0 +1,129 @@
"""Loader for TRREB purchase data into fact_purchases."""
from sqlalchemy.orm import Session
from portfolio_app.toronto.models import DimTime, DimTRREBDistrict, FactPurchases
from portfolio_app.toronto.schemas import TRREBMonthlyRecord, TRREBMonthlyReport
from .base import get_session, upsert_by_key
from .dimensions import generate_date_key
def load_trreb_purchases(
report: TRREBMonthlyReport,
session: Session | None = None,
) -> int:
"""Load TRREB monthly report data into fact_purchases.
Args:
report: Validated TRREB monthly report containing records.
session: Optional existing session.
Returns:
Number of records loaded.
"""
def _load(sess: Session) -> int:
# Get district key mapping
districts = sess.query(DimTRREBDistrict).all()
district_map = {d.district_code: d.district_key for d in districts}
# Build date key from report date
date_key = generate_date_key(report.report_date)
# Verify time dimension exists
time_dim = sess.query(DimTime).filter_by(date_key=date_key).first()
if not time_dim:
raise ValueError(
f"Time dimension not found for date_key {date_key}. "
"Load time dimension first."
)
records = []
for record in report.records:
district_key = district_map.get(record.area_code)
if not district_key:
# Skip records for unknown districts (e.g., aggregate rows)
continue
fact = FactPurchases(
date_key=date_key,
district_key=district_key,
sales_count=record.sales,
dollar_volume=record.dollar_volume,
avg_price=record.avg_price,
median_price=record.median_price,
new_listings=record.new_listings,
active_listings=record.active_listings,
avg_dom=record.avg_dom,
avg_sp_lp=record.avg_sp_lp,
)
records.append(fact)
inserted, updated = upsert_by_key(
sess, FactPurchases, records, ["date_key", "district_key"]
)
return inserted + updated
if session:
return _load(session)
with get_session() as sess:
return _load(sess)
def load_trreb_record(
record: TRREBMonthlyRecord,
session: Session | None = None,
) -> int:
"""Load a single TRREB record into fact_purchases.
Args:
record: Single validated TRREB monthly record.
session: Optional existing session.
Returns:
Number of records loaded (0 or 1).
"""
def _load(sess: Session) -> int:
# Get district key
district = (
sess.query(DimTRREBDistrict)
.filter_by(district_code=record.area_code)
.first()
)
if not district:
return 0
date_key = generate_date_key(record.report_date)
# Verify time dimension exists
time_dim = sess.query(DimTime).filter_by(date_key=date_key).first()
if not time_dim:
raise ValueError(
f"Time dimension not found for date_key {date_key}. "
"Load time dimension first."
)
fact = FactPurchases(
date_key=date_key,
district_key=district.district_key,
sales_count=record.sales,
dollar_volume=record.dollar_volume,
avg_price=record.avg_price,
median_price=record.median_price,
new_listings=record.new_listings,
active_listings=record.active_listings,
avg_dom=record.avg_dom,
avg_sp_lp=record.avg_sp_lp,
)
inserted, updated = upsert_by_key(
sess, FactPurchases, [fact], ["date_key", "district_key"]
)
return inserted + updated
if session:
return _load(session)
with get_session() as sess:
return _load(sess)