From 457bb493958507eecf2b6167b62000d347f94cd0 Mon Sep 17 00:00:00 2001 From: lmiranda Date: Sun, 11 Jan 2026 16:07:30 -0500 Subject: [PATCH] feat: add loaders and dbt models for Toronto housing data Sprint 4 implementation: Loaders: - base.py: Session management, bulk insert, upsert utilities - dimensions.py: Load time, district, zone, neighbourhood, policy dimensions - trreb.py: Load TRREB purchase data to fact_purchases - cmhc.py: Load CMHC rental data to fact_rentals dbt Project: - Project configuration (dbt_project.yml, packages.yml) - Staging models for all fact and dimension tables - Intermediate models with dimension enrichment - Marts: purchase analysis, rental analysis, market summary Closes #16 Co-Authored-By: Claude Opus 4.5 --- dbt/dbt_project.yml | 28 ++ dbt/models/intermediate/_intermediate.yml | 24 ++ .../intermediate/int_purchases__monthly.sql | 62 +++++ .../intermediate/int_rentals__annual.sql | 57 ++++ dbt/models/marts/_marts.yml | 23 ++ .../marts/mart_toronto_market_summary.sql | 81 ++++++ dbt/models/marts/mart_toronto_purchases.sql | 79 ++++++ dbt/models/marts/mart_toronto_rentals.sql | 64 +++++ dbt/models/staging/_sources.yml | 61 +++++ dbt/models/staging/_staging.yml | 73 +++++ dbt/models/staging/stg_cmhc__rentals.sql | 26 ++ .../staging/stg_dimensions__cmhc_zones.sql | 18 ++ dbt/models/staging/stg_dimensions__time.sql | 21 ++ .../stg_dimensions__trreb_districts.sql | 19 ++ dbt/models/staging/stg_trreb__purchases.sql | 25 ++ dbt/packages.yml | 5 + dbt/profiles.yml.example | 21 ++ portfolio_app/toronto/loaders/__init__.py | 31 +++ portfolio_app/toronto/loaders/base.py | 85 ++++++ portfolio_app/toronto/loaders/cmhc.py | 137 ++++++++++ portfolio_app/toronto/loaders/dimensions.py | 251 ++++++++++++++++++ portfolio_app/toronto/loaders/trreb.py | 129 +++++++++ 22 files changed, 1320 insertions(+) create mode 100644 dbt/dbt_project.yml create mode 100644 dbt/models/intermediate/_intermediate.yml create mode 100644 dbt/models/intermediate/int_purchases__monthly.sql create mode 100644 dbt/models/intermediate/int_rentals__annual.sql create mode 100644 dbt/models/marts/_marts.yml create mode 100644 dbt/models/marts/mart_toronto_market_summary.sql create mode 100644 dbt/models/marts/mart_toronto_purchases.sql create mode 100644 dbt/models/marts/mart_toronto_rentals.sql create mode 100644 dbt/models/staging/_sources.yml create mode 100644 dbt/models/staging/_staging.yml create mode 100644 dbt/models/staging/stg_cmhc__rentals.sql create mode 100644 dbt/models/staging/stg_dimensions__cmhc_zones.sql create mode 100644 dbt/models/staging/stg_dimensions__time.sql create mode 100644 dbt/models/staging/stg_dimensions__trreb_districts.sql create mode 100644 dbt/models/staging/stg_trreb__purchases.sql create mode 100644 dbt/packages.yml create mode 100644 dbt/profiles.yml.example create mode 100644 portfolio_app/toronto/loaders/base.py create mode 100644 portfolio_app/toronto/loaders/cmhc.py create mode 100644 portfolio_app/toronto/loaders/dimensions.py create mode 100644 portfolio_app/toronto/loaders/trreb.py diff --git a/dbt/dbt_project.yml b/dbt/dbt_project.yml new file mode 100644 index 0000000..473d75c --- /dev/null +++ b/dbt/dbt_project.yml @@ -0,0 +1,28 @@ +name: 'toronto_housing' +version: '1.0.0' +config-version: 2 + +profile: 'toronto_housing' + +model-paths: ["models"] +analysis-paths: ["analyses"] +test-paths: ["tests"] +seed-paths: ["seeds"] +macro-paths: ["macros"] +snapshot-paths: ["snapshots"] + +clean-targets: + - "target" + - "dbt_packages" + +models: + toronto_housing: + staging: + +materialized: view + +schema: staging + intermediate: + +materialized: view + +schema: intermediate + marts: + +materialized: table + +schema: marts diff --git a/dbt/models/intermediate/_intermediate.yml b/dbt/models/intermediate/_intermediate.yml new file mode 100644 index 0000000..ab62db8 --- /dev/null +++ b/dbt/models/intermediate/_intermediate.yml @@ -0,0 +1,24 @@ +version: 2 + +models: + - name: int_purchases__monthly + description: "Purchase data enriched with time and district dimensions" + columns: + - name: purchase_id + tests: + - unique + - not_null + - name: district_code + tests: + - not_null + + - name: int_rentals__annual + description: "Rental data enriched with time and zone dimensions" + columns: + - name: rental_id + tests: + - unique + - not_null + - name: zone_code + tests: + - not_null diff --git a/dbt/models/intermediate/int_purchases__monthly.sql b/dbt/models/intermediate/int_purchases__monthly.sql new file mode 100644 index 0000000..b03f5d8 --- /dev/null +++ b/dbt/models/intermediate/int_purchases__monthly.sql @@ -0,0 +1,62 @@ +-- Intermediate: Monthly purchase data enriched with dimensions +-- Joins purchases with time and district dimensions for analysis + +with purchases as ( + select * from {{ ref('stg_trreb__purchases') }} +), + +time_dim as ( + select * from {{ ref('stg_dimensions__time') }} +), + +district_dim as ( + select * from {{ ref('stg_dimensions__trreb_districts') }} +), + +enriched as ( + select + p.purchase_id, + + -- Time attributes + t.date_key, + t.full_date, + t.year, + t.month, + t.quarter, + t.month_name, + + -- District attributes + d.district_key, + d.district_code, + d.district_name, + d.area_type, + + -- Metrics + p.sales_count, + p.dollar_volume, + p.avg_price, + p.median_price, + p.new_listings, + p.active_listings, + p.days_on_market, + p.sale_to_list_ratio, + + -- Calculated metrics + case + when p.active_listings > 0 + then round(p.sales_count::numeric / p.active_listings, 3) + else null + end as absorption_rate, + + case + when p.sales_count > 0 + then round(p.active_listings::numeric / p.sales_count, 1) + else null + end as months_of_inventory + + from purchases p + inner join time_dim t on p.date_key = t.date_key + inner join district_dim d on p.district_key = d.district_key +) + +select * from enriched diff --git a/dbt/models/intermediate/int_rentals__annual.sql b/dbt/models/intermediate/int_rentals__annual.sql new file mode 100644 index 0000000..cd9fc87 --- /dev/null +++ b/dbt/models/intermediate/int_rentals__annual.sql @@ -0,0 +1,57 @@ +-- Intermediate: Annual rental data enriched with dimensions +-- Joins rentals with time and zone dimensions for analysis + +with rentals as ( + select * from {{ ref('stg_cmhc__rentals') }} +), + +time_dim as ( + select * from {{ ref('stg_dimensions__time') }} +), + +zone_dim as ( + select * from {{ ref('stg_dimensions__cmhc_zones') }} +), + +enriched as ( + select + r.rental_id, + + -- Time attributes + t.date_key, + t.full_date, + t.year, + t.month, + t.quarter, + + -- Zone attributes + z.zone_key, + z.zone_code, + z.zone_name, + + -- Bedroom type + r.bedroom_type, + + -- Metrics + r.rental_universe, + r.avg_rent, + r.median_rent, + r.vacancy_rate, + r.availability_rate, + r.turnover_rate, + r.year_over_year_rent_change, + r.reliability_code, + + -- Calculated metrics + case + when r.rental_universe > 0 and r.vacancy_rate is not null + then round(r.rental_universe * (r.vacancy_rate / 100), 0) + else null + end as vacant_units_estimate + + from rentals r + inner join time_dim t on r.date_key = t.date_key + inner join zone_dim z on r.zone_key = z.zone_key +) + +select * from enriched diff --git a/dbt/models/marts/_marts.yml b/dbt/models/marts/_marts.yml new file mode 100644 index 0000000..b6419a2 --- /dev/null +++ b/dbt/models/marts/_marts.yml @@ -0,0 +1,23 @@ +version: 2 + +models: + - name: mart_toronto_purchases + description: "Final mart for Toronto purchase/sales analysis by district and time" + columns: + - name: purchase_id + description: "Unique purchase record identifier" + tests: + - unique + - not_null + + - name: mart_toronto_rentals + description: "Final mart for Toronto rental market analysis by zone and time" + columns: + - name: rental_id + description: "Unique rental record identifier" + tests: + - unique + - not_null + + - name: mart_toronto_market_summary + description: "Combined market summary aggregating purchases and rentals at Toronto level" diff --git a/dbt/models/marts/mart_toronto_market_summary.sql b/dbt/models/marts/mart_toronto_market_summary.sql new file mode 100644 index 0000000..cec3c77 --- /dev/null +++ b/dbt/models/marts/mart_toronto_market_summary.sql @@ -0,0 +1,81 @@ +-- Mart: Toronto Market Summary +-- Aggregated view combining purchase and rental market indicators +-- Grain: One row per year-month + +with purchases_agg as ( + select + year, + month, + month_name, + quarter, + + -- Aggregate purchase metrics across all districts + sum(sales_count) as total_sales, + sum(dollar_volume) as total_dollar_volume, + round(avg(avg_price), 0) as avg_price_all_districts, + round(avg(median_price), 0) as median_price_all_districts, + sum(new_listings) as total_new_listings, + sum(active_listings) as total_active_listings, + round(avg(days_on_market), 0) as avg_days_on_market, + round(avg(sale_to_list_ratio), 2) as avg_sale_to_list_ratio, + round(avg(absorption_rate), 3) as avg_absorption_rate, + round(avg(months_of_inventory), 1) as avg_months_of_inventory, + round(avg(avg_price_yoy_pct), 2) as avg_price_yoy_pct + + from {{ ref('mart_toronto_purchases') }} + group by year, month, month_name, quarter +), + +rentals_agg as ( + select + year, + + -- Aggregate rental metrics across all zones (all bedroom types) + round(avg(avg_rent), 0) as avg_rent_all_zones, + round(avg(vacancy_rate), 2) as avg_vacancy_rate, + round(avg(rent_change_pct), 2) as avg_rent_change_pct, + sum(rental_universe) as total_rental_universe + + from {{ ref('mart_toronto_rentals') }} + group by year +), + +final as ( + select + p.year, + p.month, + p.month_name, + p.quarter, + + -- Purchase market indicators + p.total_sales, + p.total_dollar_volume, + p.avg_price_all_districts, + p.median_price_all_districts, + p.total_new_listings, + p.total_active_listings, + p.avg_days_on_market, + p.avg_sale_to_list_ratio, + p.avg_absorption_rate, + p.avg_months_of_inventory, + p.avg_price_yoy_pct, + + -- Rental market indicators (annual, so join on year) + r.avg_rent_all_zones, + r.avg_vacancy_rate, + r.avg_rent_change_pct, + r.total_rental_universe, + + -- Affordability indicator (price to rent ratio) + case + when r.avg_rent_all_zones > 0 + then round(p.avg_price_all_districts / (r.avg_rent_all_zones * 12), 1) + else null + end as price_to_annual_rent_ratio + + from purchases_agg p + left join rentals_agg r on p.year = r.year +) + +select * from final +order by year desc, month desc diff --git a/dbt/models/marts/mart_toronto_purchases.sql b/dbt/models/marts/mart_toronto_purchases.sql new file mode 100644 index 0000000..80c5766 --- /dev/null +++ b/dbt/models/marts/mart_toronto_purchases.sql @@ -0,0 +1,79 @@ +-- Mart: Toronto Purchase Market Analysis +-- Final analytical table for purchase/sales data visualization +-- Grain: One row per district per month + +with purchases as ( + select * from {{ ref('int_purchases__monthly') }} +), + +-- Add year-over-year calculations +with_yoy as ( + select + p.*, + + -- Previous year same month values + lag(p.avg_price, 12) over ( + partition by p.district_code + order by p.date_key + ) as avg_price_prev_year, + + lag(p.sales_count, 12) over ( + partition by p.district_code + order by p.date_key + ) as sales_count_prev_year, + + lag(p.median_price, 12) over ( + partition by p.district_code + order by p.date_key + ) as median_price_prev_year + + from purchases p +), + +final as ( + select + purchase_id, + date_key, + full_date, + year, + month, + quarter, + month_name, + district_key, + district_code, + district_name, + area_type, + sales_count, + dollar_volume, + avg_price, + median_price, + new_listings, + active_listings, + days_on_market, + sale_to_list_ratio, + absorption_rate, + months_of_inventory, + + -- Year-over-year changes + case + when avg_price_prev_year > 0 + then round(((avg_price - avg_price_prev_year) / avg_price_prev_year) * 100, 2) + else null + end as avg_price_yoy_pct, + + case + when sales_count_prev_year > 0 + then round(((sales_count - sales_count_prev_year)::numeric / sales_count_prev_year) * 100, 2) + else null + end as sales_count_yoy_pct, + + case + when median_price_prev_year > 0 + then round(((median_price - median_price_prev_year) / median_price_prev_year) * 100, 2) + else null + end as median_price_yoy_pct + + from with_yoy +) + +select * from final diff --git a/dbt/models/marts/mart_toronto_rentals.sql b/dbt/models/marts/mart_toronto_rentals.sql new file mode 100644 index 0000000..1785933 --- /dev/null +++ b/dbt/models/marts/mart_toronto_rentals.sql @@ -0,0 +1,64 @@ +-- Mart: Toronto Rental Market Analysis +-- Final analytical table for rental market visualization +-- Grain: One row per zone per bedroom type per survey year + +with rentals as ( + select * from {{ ref('int_rentals__annual') }} +), + +-- Add year-over-year calculations +with_yoy as ( + select + r.*, + + -- Previous year values + lag(r.avg_rent, 1) over ( + partition by r.zone_code, r.bedroom_type + order by r.year + ) as avg_rent_prev_year, + + lag(r.vacancy_rate, 1) over ( + partition by r.zone_code, r.bedroom_type + order by r.year + ) as vacancy_rate_prev_year + + from rentals r +), + +final as ( + select + rental_id, + date_key, + full_date, + year, + quarter, + zone_key, + zone_code, + zone_name, + bedroom_type, + rental_universe, + avg_rent, + median_rent, + vacancy_rate, + availability_rate, + turnover_rate, + year_over_year_rent_change, + reliability_code, + vacant_units_estimate, + + -- Calculated year-over-year (if not provided) + coalesce( + year_over_year_rent_change, + case + when avg_rent_prev_year > 0 + then round(((avg_rent - avg_rent_prev_year) / avg_rent_prev_year) * 100, 2) + else null + end + ) as rent_change_pct, + + vacancy_rate - vacancy_rate_prev_year as vacancy_rate_change + + from with_yoy +) + +select * from final diff --git a/dbt/models/staging/_sources.yml b/dbt/models/staging/_sources.yml new file mode 100644 index 0000000..ff92376 --- /dev/null +++ b/dbt/models/staging/_sources.yml @@ -0,0 +1,61 @@ +version: 2 + +sources: + - name: toronto_housing + description: "Toronto housing data loaded from TRREB and CMHC sources" + database: portfolio + schema: public + tables: + - name: fact_purchases + description: "TRREB monthly purchase/sales statistics by district" + columns: + - name: id + description: "Primary key" + - name: date_key + description: "Foreign key to dim_time" + - name: district_key + description: "Foreign key to dim_trreb_district" + + - name: fact_rentals + description: "CMHC annual rental survey data by zone and bedroom type" + columns: + - name: id + description: "Primary key" + - name: date_key + description: "Foreign key to dim_time" + - name: zone_key + description: "Foreign key to dim_cmhc_zone" + + - name: dim_time + description: "Time dimension (monthly grain)" + columns: + - name: date_key + description: "Primary key (YYYYMMDD format)" + + - name: dim_trreb_district + description: "TRREB district dimension with geometry" + columns: + - name: district_key + description: "Primary key" + - name: district_code + description: "TRREB district code" + + - name: dim_cmhc_zone + description: "CMHC zone dimension with geometry" + columns: + - name: zone_key + description: "Primary key" + - name: zone_code + description: "CMHC zone code" + + - name: dim_neighbourhood + description: "City of Toronto neighbourhoods (reference only)" + columns: + - name: neighbourhood_id + description: "Primary key" + + - name: dim_policy_event + description: "Housing policy events for annotation" + columns: + - name: event_id + description: "Primary key" diff --git a/dbt/models/staging/_staging.yml b/dbt/models/staging/_staging.yml new file mode 100644 index 0000000..a3458a6 --- /dev/null +++ b/dbt/models/staging/_staging.yml @@ -0,0 +1,73 @@ +version: 2 + +models: + - name: stg_trreb__purchases + description: "Staged TRREB purchase/sales data from fact_purchases" + columns: + - name: purchase_id + description: "Unique identifier for purchase record" + tests: + - unique + - not_null + - name: date_key + description: "Date dimension key (YYYYMMDD)" + tests: + - not_null + - name: district_key + description: "TRREB district dimension key" + tests: + - not_null + + - name: stg_cmhc__rentals + description: "Staged CMHC rental market data from fact_rentals" + columns: + - name: rental_id + description: "Unique identifier for rental record" + tests: + - unique + - not_null + - name: date_key + description: "Date dimension key (YYYYMMDD)" + tests: + - not_null + - name: zone_key + description: "CMHC zone dimension key" + tests: + - not_null + + - name: stg_dimensions__time + description: "Staged time dimension" + columns: + - name: date_key + description: "Date dimension key (YYYYMMDD)" + tests: + - unique + - not_null + + - name: stg_dimensions__trreb_districts + description: "Staged TRREB district dimension" + columns: + - name: district_key + description: "District dimension key" + tests: + - unique + - not_null + - name: district_code + description: "TRREB district code (e.g., W01, C01)" + tests: + - unique + - not_null + + - name: stg_dimensions__cmhc_zones + description: "Staged CMHC zone dimension" + columns: + - name: zone_key + description: "Zone dimension key" + tests: + - unique + - not_null + - name: zone_code + description: "CMHC zone code" + tests: + - unique + - not_null diff --git a/dbt/models/staging/stg_cmhc__rentals.sql b/dbt/models/staging/stg_cmhc__rentals.sql new file mode 100644 index 0000000..2faa941 --- /dev/null +++ b/dbt/models/staging/stg_cmhc__rentals.sql @@ -0,0 +1,26 @@ +-- Staged CMHC rental market survey data +-- Source: fact_rentals table loaded from CMHC CSV exports +-- Grain: One row per zone per bedroom type per survey year + +with source as ( + select * from {{ source('toronto_housing', 'fact_rentals') }} +), + +staged as ( + select + id as rental_id, + date_key, + zone_key, + bedroom_type, + universe as rental_universe, + avg_rent, + median_rent, + vacancy_rate, + availability_rate, + turnover_rate, + rent_change_pct as year_over_year_rent_change, + reliability_code + from source +) + +select * from staged diff --git a/dbt/models/staging/stg_dimensions__cmhc_zones.sql b/dbt/models/staging/stg_dimensions__cmhc_zones.sql new file mode 100644 index 0000000..6ef3344 --- /dev/null +++ b/dbt/models/staging/stg_dimensions__cmhc_zones.sql @@ -0,0 +1,18 @@ +-- Staged CMHC zone dimension +-- Source: dim_cmhc_zone table +-- Grain: One row per zone + +with source as ( + select * from {{ source('toronto_housing', 'dim_cmhc_zone') }} +), + +staged as ( + select + zone_key, + zone_code, + zone_name, + geometry + from source +) + +select * from staged diff --git a/dbt/models/staging/stg_dimensions__time.sql b/dbt/models/staging/stg_dimensions__time.sql new file mode 100644 index 0000000..c693af9 --- /dev/null +++ b/dbt/models/staging/stg_dimensions__time.sql @@ -0,0 +1,21 @@ +-- Staged time dimension +-- Source: dim_time table +-- Grain: One row per month + +with source as ( + select * from {{ source('toronto_housing', 'dim_time') }} +), + +staged as ( + select + date_key, + full_date, + year, + month, + quarter, + month_name, + is_month_start + from source +) + +select * from staged diff --git a/dbt/models/staging/stg_dimensions__trreb_districts.sql b/dbt/models/staging/stg_dimensions__trreb_districts.sql new file mode 100644 index 0000000..c0e5dc6 --- /dev/null +++ b/dbt/models/staging/stg_dimensions__trreb_districts.sql @@ -0,0 +1,19 @@ +-- Staged TRREB district dimension +-- Source: dim_trreb_district table +-- Grain: One row per district + +with source as ( + select * from {{ source('toronto_housing', 'dim_trreb_district') }} +), + +staged as ( + select + district_key, + district_code, + district_name, + area_type, + geometry + from source +) + +select * from staged diff --git a/dbt/models/staging/stg_trreb__purchases.sql b/dbt/models/staging/stg_trreb__purchases.sql new file mode 100644 index 0000000..3694d71 --- /dev/null +++ b/dbt/models/staging/stg_trreb__purchases.sql @@ -0,0 +1,25 @@ +-- Staged TRREB purchase/sales data +-- Source: fact_purchases table loaded from TRREB Market Watch PDFs +-- Grain: One row per district per month + +with source as ( + select * from {{ source('toronto_housing', 'fact_purchases') }} +), + +staged as ( + select + id as purchase_id, + date_key, + district_key, + sales_count, + dollar_volume, + avg_price, + median_price, + new_listings, + active_listings, + avg_dom as days_on_market, + avg_sp_lp as sale_to_list_ratio + from source +) + +select * from staged diff --git a/dbt/packages.yml b/dbt/packages.yml new file mode 100644 index 0000000..52f4efb --- /dev/null +++ b/dbt/packages.yml @@ -0,0 +1,5 @@ +packages: + - package: dbt-labs/dbt_utils + version: ">=1.0.0" + - package: calogica/dbt_expectations + version: ">=0.10.0" diff --git a/dbt/profiles.yml.example b/dbt/profiles.yml.example new file mode 100644 index 0000000..360fbc2 --- /dev/null +++ b/dbt/profiles.yml.example @@ -0,0 +1,21 @@ +toronto_housing: + target: dev + outputs: + dev: + type: postgres + host: localhost + user: portfolio + password: "{{ env_var('POSTGRES_PASSWORD') }}" + port: 5432 + dbname: portfolio + schema: public + threads: 4 + prod: + type: postgres + host: "{{ env_var('POSTGRES_HOST') }}" + user: "{{ env_var('POSTGRES_USER') }}" + password: "{{ env_var('POSTGRES_PASSWORD') }}" + port: 5432 + dbname: portfolio + schema: public + threads: 4 diff --git a/portfolio_app/toronto/loaders/__init__.py b/portfolio_app/toronto/loaders/__init__.py index b55ea0b..1b47b25 100644 --- a/portfolio_app/toronto/loaders/__init__.py +++ b/portfolio_app/toronto/loaders/__init__.py @@ -1 +1,32 @@ """Database loaders for Toronto housing data.""" + +from .base import bulk_insert, get_session, upsert_by_key +from .cmhc import load_cmhc_record, load_cmhc_rentals +from .dimensions import ( + generate_date_key, + load_cmhc_zones, + load_neighbourhoods, + load_policy_events, + load_time_dimension, + load_trreb_districts, +) +from .trreb import load_trreb_purchases, load_trreb_record + +__all__ = [ + # Base utilities + "get_session", + "bulk_insert", + "upsert_by_key", + # Dimension loaders + "generate_date_key", + "load_time_dimension", + "load_trreb_districts", + "load_cmhc_zones", + "load_neighbourhoods", + "load_policy_events", + # Fact loaders + "load_trreb_purchases", + "load_trreb_record", + "load_cmhc_rentals", + "load_cmhc_record", +] diff --git a/portfolio_app/toronto/loaders/base.py b/portfolio_app/toronto/loaders/base.py new file mode 100644 index 0000000..2ddaf1e --- /dev/null +++ b/portfolio_app/toronto/loaders/base.py @@ -0,0 +1,85 @@ +"""Base loader utilities for database operations.""" + +from collections.abc import Generator +from contextlib import contextmanager +from typing import Any, TypeVar + +from sqlalchemy.orm import Session + +from portfolio_app.toronto.models import get_session_factory + +T = TypeVar("T") + + +@contextmanager +def get_session() -> Generator[Session, None, None]: + """Get a database session with automatic cleanup. + + Yields: + SQLAlchemy session that auto-commits on success, rollbacks on error. + """ + session_factory = get_session_factory() + session = session_factory() + try: + yield session + session.commit() + except Exception: + session.rollback() + raise + finally: + session.close() + + +def bulk_insert(session: Session, objects: list[T]) -> int: + """Bulk insert objects into the database. + + Args: + session: Active SQLAlchemy session. + objects: List of ORM model instances to insert. + + Returns: + Number of objects inserted. + """ + session.add_all(objects) + session.flush() + return len(objects) + + +def upsert_by_key( + session: Session, + model_class: Any, + objects: list[T], + key_columns: list[str], +) -> tuple[int, int]: + """Upsert objects based on unique key columns. + + Args: + session: Active SQLAlchemy session. + model_class: The ORM model class. + objects: List of ORM model instances to upsert. + key_columns: Column names that form the unique key. + + Returns: + Tuple of (inserted_count, updated_count). + """ + inserted = 0 + updated = 0 + + for obj in objects: + # Build filter for existing record + filters = {col: getattr(obj, col) for col in key_columns} + existing = session.query(model_class).filter_by(**filters).first() + + if existing: + # Update existing record + for column in model_class.__table__.columns: + if column.name not in key_columns and column.name != "id": + setattr(existing, column.name, getattr(obj, column.name)) + updated += 1 + else: + # Insert new record + session.add(obj) + inserted += 1 + + session.flush() + return inserted, updated diff --git a/portfolio_app/toronto/loaders/cmhc.py b/portfolio_app/toronto/loaders/cmhc.py new file mode 100644 index 0000000..09691be --- /dev/null +++ b/portfolio_app/toronto/loaders/cmhc.py @@ -0,0 +1,137 @@ +"""Loader for CMHC rental data into fact_rentals.""" + +from sqlalchemy.orm import Session + +from portfolio_app.toronto.models import DimCMHCZone, DimTime, FactRentals +from portfolio_app.toronto.schemas import CMHCAnnualSurvey, CMHCRentalRecord + +from .base import get_session, upsert_by_key +from .dimensions import generate_date_key + + +def load_cmhc_rentals( + survey: CMHCAnnualSurvey, + session: Session | None = None, +) -> int: + """Load CMHC annual survey data into fact_rentals. + + Args: + survey: Validated CMHC annual survey containing records. + session: Optional existing session. + + Returns: + Number of records loaded. + """ + from datetime import date + + def _load(sess: Session) -> int: + # Get zone key mapping + zones = sess.query(DimCMHCZone).all() + zone_map = {z.zone_code: z.zone_key for z in zones} + + # CMHC surveys are annual - use October 1st as reference date + survey_date = date(survey.survey_year, 10, 1) + date_key = generate_date_key(survey_date) + + # Verify time dimension exists + time_dim = sess.query(DimTime).filter_by(date_key=date_key).first() + if not time_dim: + raise ValueError( + f"Time dimension not found for date_key {date_key}. " + "Load time dimension first." + ) + + records = [] + for record in survey.records: + zone_key = zone_map.get(record.zone_code) + if not zone_key: + # Skip records for unknown zones + continue + + fact = FactRentals( + date_key=date_key, + zone_key=zone_key, + bedroom_type=record.bedroom_type.value, + universe=record.universe, + avg_rent=record.average_rent, + median_rent=record.median_rent, + vacancy_rate=record.vacancy_rate, + availability_rate=record.availability_rate, + turnover_rate=record.turnover_rate, + rent_change_pct=record.rent_change_pct, + reliability_code=record.average_rent_reliability.value + if record.average_rent_reliability + else None, + ) + records.append(fact) + + inserted, updated = upsert_by_key( + sess, FactRentals, records, ["date_key", "zone_key", "bedroom_type"] + ) + return inserted + updated + + if session: + return _load(session) + with get_session() as sess: + return _load(sess) + + +def load_cmhc_record( + record: CMHCRentalRecord, + survey_year: int, + session: Session | None = None, +) -> int: + """Load a single CMHC record into fact_rentals. + + Args: + record: Single validated CMHC rental record. + survey_year: Year of the survey. + session: Optional existing session. + + Returns: + Number of records loaded (0 or 1). + """ + from datetime import date + + def _load(sess: Session) -> int: + # Get zone key + zone = sess.query(DimCMHCZone).filter_by(zone_code=record.zone_code).first() + if not zone: + return 0 + + survey_date = date(survey_year, 10, 1) + date_key = generate_date_key(survey_date) + + # Verify time dimension exists + time_dim = sess.query(DimTime).filter_by(date_key=date_key).first() + if not time_dim: + raise ValueError( + f"Time dimension not found for date_key {date_key}. " + "Load time dimension first." + ) + + fact = FactRentals( + date_key=date_key, + zone_key=zone.zone_key, + bedroom_type=record.bedroom_type.value, + universe=record.universe, + avg_rent=record.average_rent, + median_rent=record.median_rent, + vacancy_rate=record.vacancy_rate, + availability_rate=record.availability_rate, + turnover_rate=record.turnover_rate, + rent_change_pct=record.rent_change_pct, + reliability_code=record.average_rent_reliability.value + if record.average_rent_reliability + else None, + ) + + inserted, updated = upsert_by_key( + sess, FactRentals, [fact], ["date_key", "zone_key", "bedroom_type"] + ) + return inserted + updated + + if session: + return _load(session) + with get_session() as sess: + return _load(sess) diff --git a/portfolio_app/toronto/loaders/dimensions.py b/portfolio_app/toronto/loaders/dimensions.py new file mode 100644 index 0000000..c69424f --- /dev/null +++ b/portfolio_app/toronto/loaders/dimensions.py @@ -0,0 +1,251 @@ +"""Loaders for dimension tables.""" + +from datetime import date + +from sqlalchemy.orm import Session + +from portfolio_app.toronto.models import ( + DimCMHCZone, + DimNeighbourhood, + DimPolicyEvent, + DimTime, + DimTRREBDistrict, +) +from portfolio_app.toronto.schemas import ( + CMHCZone, + Neighbourhood, + PolicyEvent, + TRREBDistrict, +) + +from .base import get_session, upsert_by_key + + +def generate_date_key(d: date) -> int: + """Generate integer date key from date (YYYYMMDD format). + + Args: + d: Date to convert. + + Returns: + Integer in YYYYMMDD format. + """ + return d.year * 10000 + d.month * 100 + d.day + + +def load_time_dimension( + start_date: date, + end_date: date, + session: Session | None = None, +) -> int: + """Load time dimension with date range. + + Args: + start_date: Start of date range. + end_date: End of date range (inclusive). + session: Optional existing session. + + Returns: + Number of records loaded. + """ + + month_names = [ + "", + "January", + "February", + "March", + "April", + "May", + "June", + "July", + "August", + "September", + "October", + "November", + "December", + ] + + def _load(sess: Session) -> int: + records = [] + current = start_date.replace(day=1) # Start at month beginning + + while current <= end_date: + quarter = (current.month - 1) // 3 + 1 + dim = DimTime( + date_key=generate_date_key(current), + full_date=current, + year=current.year, + month=current.month, + quarter=quarter, + month_name=month_names[current.month], + is_month_start=True, + ) + records.append(dim) + + # Move to next month + if current.month == 12: + current = current.replace(year=current.year + 1, month=1) + else: + current = current.replace(month=current.month + 1) + + inserted, updated = upsert_by_key(sess, DimTime, records, ["date_key"]) + return inserted + updated + + if session: + return _load(session) + with get_session() as sess: + return _load(sess) + + +def load_trreb_districts( + districts: list[TRREBDistrict], + session: Session | None = None, +) -> int: + """Load TRREB district dimension. + + Args: + districts: List of validated district schemas. + session: Optional existing session. + + Returns: + Number of records loaded. + """ + + def _load(sess: Session) -> int: + records = [] + for d in districts: + dim = DimTRREBDistrict( + district_code=d.district_code, + district_name=d.district_name, + area_type=d.area_type.value, + geometry=d.geometry_wkt, + ) + records.append(dim) + + inserted, updated = upsert_by_key( + sess, DimTRREBDistrict, records, ["district_code"] + ) + return inserted + updated + + if session: + return _load(session) + with get_session() as sess: + return _load(sess) + + +def load_cmhc_zones( + zones: list[CMHCZone], + session: Session | None = None, +) -> int: + """Load CMHC zone dimension. + + Args: + zones: List of validated zone schemas. + session: Optional existing session. + + Returns: + Number of records loaded. + """ + + def _load(sess: Session) -> int: + records = [] + for z in zones: + dim = DimCMHCZone( + zone_code=z.zone_code, + zone_name=z.zone_name, + geometry=z.geometry_wkt, + ) + records.append(dim) + + inserted, updated = upsert_by_key(sess, DimCMHCZone, records, ["zone_code"]) + return inserted + updated + + if session: + return _load(session) + with get_session() as sess: + return _load(sess) + + +def load_neighbourhoods( + neighbourhoods: list[Neighbourhood], + session: Session | None = None, +) -> int: + """Load neighbourhood dimension. + + Args: + neighbourhoods: List of validated neighbourhood schemas. + session: Optional existing session. + + Returns: + Number of records loaded. + """ + + def _load(sess: Session) -> int: + records = [] + for n in neighbourhoods: + dim = DimNeighbourhood( + neighbourhood_id=n.neighbourhood_id, + name=n.name, + geometry=n.geometry_wkt, + population=n.population, + land_area_sqkm=n.land_area_sqkm, + pop_density_per_sqkm=n.pop_density_per_sqkm, + pct_bachelors_or_higher=n.pct_bachelors_or_higher, + median_household_income=n.median_household_income, + pct_owner_occupied=n.pct_owner_occupied, + pct_renter_occupied=n.pct_renter_occupied, + census_year=n.census_year, + ) + records.append(dim) + + inserted, updated = upsert_by_key( + sess, DimNeighbourhood, records, ["neighbourhood_id"] + ) + return inserted + updated + + if session: + return _load(session) + with get_session() as sess: + return _load(sess) + + +def load_policy_events( + events: list[PolicyEvent], + session: Session | None = None, +) -> int: + """Load policy event dimension. + + Args: + events: List of validated policy event schemas. + session: Optional existing session. + + Returns: + Number of records loaded. + """ + + def _load(sess: Session) -> int: + records = [] + for e in events: + dim = DimPolicyEvent( + event_date=e.event_date, + effective_date=e.effective_date, + level=e.level.value, + category=e.category.value, + title=e.title, + description=e.description, + expected_direction=e.expected_direction.value, + source_url=e.source_url, + confidence=e.confidence.value, + ) + records.append(dim) + + # For policy events, use event_date + title as unique key + inserted, updated = upsert_by_key( + sess, DimPolicyEvent, records, ["event_date", "title"] + ) + return inserted + updated + + if session: + return _load(session) + with get_session() as sess: + return _load(sess) diff --git a/portfolio_app/toronto/loaders/trreb.py b/portfolio_app/toronto/loaders/trreb.py new file mode 100644 index 0000000..06e4c8b --- /dev/null +++ b/portfolio_app/toronto/loaders/trreb.py @@ -0,0 +1,129 @@ +"""Loader for TRREB purchase data into fact_purchases.""" + +from sqlalchemy.orm import Session + +from portfolio_app.toronto.models import DimTime, DimTRREBDistrict, FactPurchases +from portfolio_app.toronto.schemas import TRREBMonthlyRecord, TRREBMonthlyReport + +from .base import get_session, upsert_by_key +from .dimensions import generate_date_key + + +def load_trreb_purchases( + report: TRREBMonthlyReport, + session: Session | None = None, +) -> int: + """Load TRREB monthly report data into fact_purchases. + + Args: + report: Validated TRREB monthly report containing records. + session: Optional existing session. + + Returns: + Number of records loaded. + """ + + def _load(sess: Session) -> int: + # Get district key mapping + districts = sess.query(DimTRREBDistrict).all() + district_map = {d.district_code: d.district_key for d in districts} + + # Build date key from report date + date_key = generate_date_key(report.report_date) + + # Verify time dimension exists + time_dim = sess.query(DimTime).filter_by(date_key=date_key).first() + if not time_dim: + raise ValueError( + f"Time dimension not found for date_key {date_key}. " + "Load time dimension first." + ) + + records = [] + for record in report.records: + district_key = district_map.get(record.area_code) + if not district_key: + # Skip records for unknown districts (e.g., aggregate rows) + continue + + fact = FactPurchases( + date_key=date_key, + district_key=district_key, + sales_count=record.sales, + dollar_volume=record.dollar_volume, + avg_price=record.avg_price, + median_price=record.median_price, + new_listings=record.new_listings, + active_listings=record.active_listings, + avg_dom=record.avg_dom, + avg_sp_lp=record.avg_sp_lp, + ) + records.append(fact) + + inserted, updated = upsert_by_key( + sess, FactPurchases, records, ["date_key", "district_key"] + ) + return inserted + updated + + if session: + return _load(session) + with get_session() as sess: + return _load(sess) + + +def load_trreb_record( + record: TRREBMonthlyRecord, + session: Session | None = None, +) -> int: + """Load a single TRREB record into fact_purchases. + + Args: + record: Single validated TRREB monthly record. + session: Optional existing session. + + Returns: + Number of records loaded (0 or 1). + """ + + def _load(sess: Session) -> int: + # Get district key + district = ( + sess.query(DimTRREBDistrict) + .filter_by(district_code=record.area_code) + .first() + ) + if not district: + return 0 + + date_key = generate_date_key(record.report_date) + + # Verify time dimension exists + time_dim = sess.query(DimTime).filter_by(date_key=date_key).first() + if not time_dim: + raise ValueError( + f"Time dimension not found for date_key {date_key}. " + "Load time dimension first." + ) + + fact = FactPurchases( + date_key=date_key, + district_key=district.district_key, + sales_count=record.sales, + dollar_volume=record.dollar_volume, + avg_price=record.avg_price, + median_price=record.median_price, + new_listings=record.new_listings, + active_listings=record.active_listings, + avg_dom=record.avg_dom, + avg_sp_lp=record.avg_sp_lp, + ) + + inserted, updated = upsert_by_key( + sess, FactPurchases, [fact], ["date_key", "district_key"] + ) + return inserted + updated + + if session: + return _load(session) + with get_session() as sess: + return _load(sess)