Files
personal-portfolio/portfolio_app/toronto/loaders/base.py
lmiranda 457bb49395 feat: add loaders and dbt models for Toronto housing data
Sprint 4 implementation:

Loaders:
- base.py: Session management, bulk insert, upsert utilities
- dimensions.py: Load time, district, zone, neighbourhood, policy dimensions
- trreb.py: Load TRREB purchase data to fact_purchases
- cmhc.py: Load CMHC rental data to fact_rentals

dbt Project:
- Project configuration (dbt_project.yml, packages.yml)
- Staging models for all fact and dimension tables
- Intermediate models with dimension enrichment
- Marts: purchase analysis, rental analysis, market summary

Closes #16

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-11 16:07:30 -05:00

86 lines
2.2 KiB
Python

"""Base loader utilities for database operations."""
from collections.abc import Generator
from contextlib import contextmanager
from typing import Any, TypeVar
from sqlalchemy.orm import Session
from portfolio_app.toronto.models import get_session_factory
T = TypeVar("T")
@contextmanager
def get_session() -> Generator[Session, None, None]:
"""Get a database session with automatic cleanup.
Yields:
SQLAlchemy session that auto-commits on success, rollbacks on error.
"""
session_factory = get_session_factory()
session = session_factory()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
def bulk_insert(session: Session, objects: list[T]) -> int:
"""Bulk insert objects into the database.
Args:
session: Active SQLAlchemy session.
objects: List of ORM model instances to insert.
Returns:
Number of objects inserted.
"""
session.add_all(objects)
session.flush()
return len(objects)
def upsert_by_key(
session: Session,
model_class: Any,
objects: list[T],
key_columns: list[str],
) -> tuple[int, int]:
"""Upsert objects based on unique key columns.
Args:
session: Active SQLAlchemy session.
model_class: The ORM model class.
objects: List of ORM model instances to upsert.
key_columns: Column names that form the unique key.
Returns:
Tuple of (inserted_count, updated_count).
"""
inserted = 0
updated = 0
for obj in objects:
# Build filter for existing record
filters = {col: getattr(obj, col) for col in key_columns}
existing = session.query(model_class).filter_by(**filters).first()
if existing:
# Update existing record
for column in model_class.__table__.columns:
if column.name not in key_columns and column.name != "id":
setattr(existing, column.name, getattr(obj, column.name))
updated += 1
else:
# Insert new record
session.add(obj)
inserted += 1
session.flush()
return inserted, updated