2 Commits

Author SHA1 Message Date
bf6e392002 feat: Sprint 10 - Architecture docs, CI/CD, operational scripts
Some checks failed
CI / lint-and-test (push) Has been cancelled
Phase 1 - Architecture Documentation:
- Add Architecture section with Mermaid flowchart to README
- Create docs/DATABASE_SCHEMA.md with full ERD

Phase 2 - CI/CD:
- Add CI badge to README
- Create .gitea/workflows/ci.yml for linting and tests
- Create .gitea/workflows/deploy-staging.yml
- Create .gitea/workflows/deploy-production.yml

Phase 3 - Operational Scripts:
- Create scripts/logs.sh for docker compose log following
- Create scripts/run-detached.sh with health check loop
- Create scripts/etl/toronto.sh for Toronto data pipeline
- Add Makefile targets: logs, run-detached, etl-toronto

Phase 4 - Runbooks:
- Create docs/runbooks/adding-dashboard.md
- Create docs/runbooks/deployment.md

Phase 5 - Hygiene:
- Create MIT LICENSE file

Phase 6 - Production:
- Add live demo link to README (leodata.science)

Closes #78, #79, #80, #81, #82, #83, #84, #85, #86, #87, #88, #89, #91

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-17 17:10:30 -05:00
d0f32edba7 fix: Repair data pipeline with StatCan CMHC rental data
- Add StatCan CMHC parser to fetch rental data from Statistics Canada API
- Create year spine (2014-2025) as time dimension driver instead of census
- Add CMA-level rental and income intermediate models
- Update mart_neighbourhood_overview to use rental years as base
- Fix neighbourhood_service queries to match dbt schema
- Add CMHC data loading to pipeline script

Data now flows correctly: 158 neighbourhoods × 12 years = 1,896 records
Rent data available 2019-2025, crime data 2014-2024

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-17 15:38:31 -05:00
32 changed files with 2022 additions and 157 deletions

35
.gitea/workflows/ci.yml Normal file
View File

@@ -0,0 +1,35 @@
name: CI
on:
push:
branches:
- development
- staging
- main
pull_request:
branches:
- development
jobs:
lint-and-test:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install ruff pytest
- name: Run linter
run: ruff check .
- name: Run tests
run: pytest tests/ -v --tb=short

View File

@@ -0,0 +1,44 @@
name: Deploy to Production
on:
push:
branches:
- main
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- name: Deploy to Production Server
uses: appleboy/ssh-action@v1.0.3
with:
host: ${{ secrets.PROD_HOST }}
username: ${{ secrets.PROD_USER }}
key: ${{ secrets.PROD_SSH_KEY }}
script: |
set -euo pipefail
cd ~/apps/personal-portfolio
echo "Pulling latest changes..."
git fetch origin main
git reset --hard origin/main
echo "Activating virtual environment..."
source .venv/bin/activate
echo "Installing dependencies..."
pip install -r requirements.txt --quiet
echo "Running dbt models..."
cd dbt && dbt run --profiles-dir . && cd ..
echo "Restarting application..."
docker compose down
docker compose up -d
echo "Waiting for health check..."
sleep 10
curl -f http://localhost:8050/health || exit 1
echo "Production deployment complete!"

View File

@@ -0,0 +1,44 @@
name: Deploy to Staging
on:
push:
branches:
- staging
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- name: Deploy to Staging Server
uses: appleboy/ssh-action@v1.0.3
with:
host: ${{ secrets.STAGING_HOST }}
username: ${{ secrets.STAGING_USER }}
key: ${{ secrets.STAGING_SSH_KEY }}
script: |
set -euo pipefail
cd ~/apps/personal-portfolio
echo "Pulling latest changes..."
git fetch origin staging
git reset --hard origin/staging
echo "Activating virtual environment..."
source .venv/bin/activate
echo "Installing dependencies..."
pip install -r requirements.txt --quiet
echo "Running dbt models..."
cd dbt && dbt run --profiles-dir . && cd ..
echo "Restarting application..."
docker compose down
docker compose up -d
echo "Waiting for health check..."
sleep 10
curl -f http://localhost:8050/health || exit 1
echo "Staging deployment complete!"

View File

@@ -18,7 +18,7 @@ Working context for Claude Code on the Analytics Portfolio project.
```bash ```bash
make setup # Install deps, create .env, init pre-commit make setup # Install deps, create .env, init pre-commit
make docker-up # Start PostgreSQL + PostGIS make docker-up # Start PostgreSQL + PostGIS (auto-detects x86/ARM)
make docker-down # Stop containers make docker-down # Stop containers
make db-init # Initialize database schema make db-init # Initialize database schema
make run # Start Dash dev server make run # Start Dash dev server
@@ -193,6 +193,7 @@ notebooks/ # Data documentation (Phase 6)
- SQLAlchemy 2.0 + Pydantic 2.0 only (never mix 1.x APIs) - SQLAlchemy 2.0 + Pydantic 2.0 only (never mix 1.x APIs)
- PostGIS extension required in database - PostGIS extension required in database
- Docker Compose V2 format (no `version` field) - Docker Compose V2 format (no `version` field)
- **Multi-architecture support**: `make docker-up` auto-detects CPU architecture and uses the appropriate PostGIS image (x86_64: `postgis/postgis`, ARM64: `imresamu/postgis`)
--- ---

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2024-2025 Leo Miranda
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -1,4 +1,4 @@
.PHONY: setup docker-up docker-down db-init load-data run test dbt-run dbt-test lint format ci deploy clean help .PHONY: setup docker-up docker-down db-init load-data run test dbt-run dbt-test lint format ci deploy clean help logs run-detached etl-toronto
# Default target # Default target
.DEFAULT_GOAL := help .DEFAULT_GOAL := help
@@ -8,6 +8,17 @@ PYTHON := python3
PIP := pip PIP := pip
DOCKER_COMPOSE := docker compose DOCKER_COMPOSE := docker compose
# Architecture detection for Docker images
ARCH := $(shell uname -m)
ifeq ($(ARCH),aarch64)
POSTGIS_IMAGE := imresamu/postgis:16-3.4
else ifeq ($(ARCH),arm64)
POSTGIS_IMAGE := imresamu/postgis:16-3.4
else
POSTGIS_IMAGE := postgis/postgis:16-3.4
endif
export POSTGIS_IMAGE
# Colors for output # Colors for output
BLUE := \033[0;34m BLUE := \033[0;34m
GREEN := \033[0;32m GREEN := \033[0;32m
@@ -39,6 +50,7 @@ setup: ## Install dependencies, create .env, init pre-commit
docker-up: ## Start PostgreSQL + PostGIS containers docker-up: ## Start PostgreSQL + PostGIS containers
@echo "$(GREEN)Starting database containers...$(NC)" @echo "$(GREEN)Starting database containers...$(NC)"
@echo "$(BLUE)Architecture: $(ARCH) -> Using image: $(POSTGIS_IMAGE)$(NC)"
$(DOCKER_COMPOSE) up -d $(DOCKER_COMPOSE) up -d
@echo "$(GREEN)Waiting for database to be ready...$(NC)" @echo "$(GREEN)Waiting for database to be ready...$(NC)"
@sleep 3 @sleep 3
@@ -139,6 +151,19 @@ ci: ## Run all checks (lint, typecheck, test)
$(MAKE) test $(MAKE) test
@echo "$(GREEN)All checks passed!$(NC)" @echo "$(GREEN)All checks passed!$(NC)"
# =============================================================================
# Operations
# =============================================================================
logs: ## Follow docker compose logs (usage: make logs or make logs SERVICE=postgres)
@./scripts/logs.sh $(SERVICE)
run-detached: ## Start containers and wait for health check
@./scripts/run-detached.sh
etl-toronto: ## Run Toronto ETL pipeline (usage: make etl-toronto MODE=--full)
@./scripts/etl/toronto.sh $(MODE)
# ============================================================================= # =============================================================================
# Deployment # Deployment
# ============================================================================= # =============================================================================

View File

@@ -1,5 +1,9 @@
# Analytics Portfolio # Analytics Portfolio
[![CI](https://gitea.hotserv.cloud/lmiranda/personal-portfolio/actions/workflows/ci.yml/badge.svg)](https://gitea.hotserv.cloud/lmiranda/personal-portfolio/actions)
**Live Demo:** [leodata.science](https://leodata.science)
A personal portfolio website showcasing data engineering and visualization capabilities, featuring an interactive Toronto Neighbourhood Dashboard. A personal portfolio website showcasing data engineering and visualization capabilities, featuring an interactive Toronto Neighbourhood Dashboard.
## Live Pages ## Live Pages
@@ -32,6 +36,42 @@ An interactive choropleth dashboard analyzing Toronto's 158 official neighbourho
- Toronto Police Service (crime statistics) - Toronto Police Service (crime statistics)
- CMHC Rental Market Survey (rental data by zone) - CMHC Rental Market Survey (rental data by zone)
## Architecture
```mermaid
flowchart LR
subgraph Sources
A1[City of Toronto API]
A2[Toronto Police API]
A3[CMHC Data]
end
subgraph ETL
B1[Parsers]
B2[Loaders]
end
subgraph Database
C1[(PostgreSQL/PostGIS)]
C2[dbt Models]
end
subgraph Application
D1[Dash App]
D2[Plotly Figures]
end
A1 & A2 & A3 --> B1 --> B2 --> C1 --> C2 --> D1 --> D2
```
**Pipeline Stages:**
- **Sources**: External APIs and data files (City of Toronto, Toronto Police, CMHC)
- **ETL**: Python parsers extract and validate data; loaders persist to database
- **Database**: PostgreSQL with PostGIS for geospatial; dbt transforms raw → staging → marts
- **Application**: Dash serves interactive dashboards with Plotly visualizations
For detailed database schema, see [docs/DATABASE_SCHEMA.md](docs/DATABASE_SCHEMA.md).
## Quick Start ## Quick Start
```bash ```bash

View File

@@ -0,0 +1,60 @@
-- Intermediate: Toronto CMA census statistics by year
-- Provides city-wide averages for metrics not available at neighbourhood level
-- Used when neighbourhood-level data is unavailable (e.g., median household income)
-- Grain: One row per year
with years as (
select * from {{ ref('int_year_spine') }}
),
census as (
select * from {{ ref('stg_toronto__census') }}
),
-- Census data is only available for 2016 and 2021
-- Map each analysis year to the appropriate census year
year_to_census as (
select
y.year,
case
when y.year <= 2018 then 2016
else 2021
end as census_year
from years y
),
-- Toronto CMA median household income from Statistics Canada
-- Source: Census Profile Table 98-316-X2021001
-- 2016: $65,829 (from Census Profile)
-- 2021: $84,000 (from Census Profile)
cma_income as (
select 2016 as census_year, 65829 as median_household_income union all
select 2021 as census_year, 84000 as median_household_income
),
-- City-wide aggregates from loaded neighbourhood data
city_aggregates as (
select
census_year,
sum(population) as total_population,
avg(population_density) as avg_population_density,
avg(unemployment_rate) as avg_unemployment_rate
from census
where population is not null
group by census_year
),
final as (
select
y.year,
y.census_year,
ci.median_household_income,
ca.total_population,
ca.avg_population_density,
ca.avg_unemployment_rate
from year_to_census y
left join cma_income ci on y.census_year = ci.census_year
left join city_aggregates ca on y.census_year = ca.census_year
)
select * from final

View File

@@ -34,7 +34,7 @@ amenity_scores as (
n.population, n.population,
n.land_area_sqkm, n.land_area_sqkm,
a.year, coalesce(a.year, 2021) as year,
-- Raw counts -- Raw counts
a.parks_count, a.parks_count,

View File

@@ -64,15 +64,17 @@ crime_summary as (
w.robbery_count, w.robbery_count,
w.theft_over_count, w.theft_over_count,
w.homicide_count, w.homicide_count,
w.avg_rate_per_100k,
w.yoy_change_pct, w.yoy_change_pct,
-- Crime rate per 100K population -- Crime rate per 100K population (use source data avg, or calculate if population available)
coalesce(
w.avg_rate_per_100k,
case case
when n.population > 0 when n.population > 0
then round(w.total_incidents::numeric / n.population * 100000, 2) then round(w.total_incidents::numeric / n.population * 100000, 2)
else null else null
end as crime_rate_per_100k end
) as crime_rate_per_100k
from neighbourhoods n from neighbourhoods n
inner join with_yoy w on n.neighbourhood_id = w.neighbourhood_id inner join with_yoy w on n.neighbourhood_id = w.neighbourhood_id

View File

@@ -17,7 +17,8 @@ demographics as (
n.geometry, n.geometry,
n.land_area_sqkm, n.land_area_sqkm,
c.census_year, -- Use census_year from census data, or fall back to dim_neighbourhood's year
coalesce(c.census_year, n.census_year, 2021) as census_year,
c.population, c.population,
c.population_density, c.population_density,
c.median_household_income, c.median_household_income,

View File

@@ -20,7 +20,7 @@ housing as (
n.neighbourhood_name, n.neighbourhood_name,
n.geometry, n.geometry,
coalesce(r.year, c.census_year) as year, coalesce(r.year, c.census_year, 2021) as year,
-- Census housing metrics -- Census housing metrics
c.pct_owner_occupied, c.pct_owner_occupied,

View File

@@ -0,0 +1,25 @@
-- Intermediate: Toronto CMA rental metrics by year
-- Aggregates rental data to city-wide averages by year
-- Source: StatCan CMHC data at CMA level
-- Grain: One row per year
with rentals as (
select * from {{ ref('stg_cmhc__rentals') }}
),
-- Pivot bedroom types to columns
yearly_rentals as (
select
year,
max(case when bedroom_type = 'bachelor' then avg_rent end) as avg_rent_bachelor,
max(case when bedroom_type = '1bed' then avg_rent end) as avg_rent_1bed,
max(case when bedroom_type = '2bed' then avg_rent end) as avg_rent_2bed,
max(case when bedroom_type = '3bed' then avg_rent end) as avg_rent_3bed,
-- Use 2-bedroom as standard reference
max(case when bedroom_type = '2bed' then avg_rent end) as avg_rent_standard,
max(vacancy_rate) as vacancy_rate
from rentals
group by year
)
select * from yearly_rentals

View File

@@ -0,0 +1,11 @@
-- Intermediate: Year spine for analysis
-- Creates a row for each year from 2014-2025
-- Used to drive time-series analysis across all data sources
with years as (
-- Generate years from available data sources
-- Crime data: 2014-2024, Rentals: 2019-2025
select generate_series(2014, 2025) as year
)
select year from years

View File

@@ -1,79 +1,119 @@
-- Mart: Neighbourhood Overview with Composite Livability Score -- Mart: Neighbourhood Overview with Composite Livability Score
-- Dashboard Tab: Overview -- Dashboard Tab: Overview
-- Grain: One row per neighbourhood per year -- Grain: One row per neighbourhood per year
-- Time spine: Years 2014-2025 (driven by crime/rental data availability)
with demographics as ( with years as (
select * from {{ ref('int_neighbourhood__demographics') }} select * from {{ ref('int_year_spine') }}
), ),
housing as ( neighbourhoods as (
select * from {{ ref('int_neighbourhood__housing') }} select * from {{ ref('stg_toronto__neighbourhoods') }}
), ),
-- Create base: all neighbourhoods × all years
neighbourhood_years as (
select
n.neighbourhood_id,
n.neighbourhood_name,
n.geometry,
y.year
from neighbourhoods n
cross join years y
),
-- Census data (available for 2016, 2021)
-- For each year, use the most recent census data available
census as (
select * from {{ ref('stg_toronto__census') }}
),
census_mapped as (
select
ny.neighbourhood_id,
ny.year,
c.population,
c.unemployment_rate,
c.pct_bachelors_or_higher as education_bachelors_pct
from neighbourhood_years ny
left join census c on ny.neighbourhood_id = c.neighbourhood_id
-- Use census year <= analysis year, prefer most recent
and c.census_year = (
select max(c2.census_year)
from {{ ref('stg_toronto__census') }} c2
where c2.neighbourhood_id = ny.neighbourhood_id
and c2.census_year <= ny.year
)
),
-- CMA-level census data (for income - not available at neighbourhood level)
cma_census as (
select * from {{ ref('int_census__toronto_cma') }}
),
-- Crime data (2014-2024)
crime as ( crime as (
select * from {{ ref('int_neighbourhood__crime_summary') }} select * from {{ ref('int_neighbourhood__crime_summary') }}
), ),
amenities as ( -- Rentals (2019-2025) - CMA level applied to all neighbourhoods
select * from {{ ref('int_neighbourhood__amenity_scores') }} rentals as (
select * from {{ ref('int_rentals__toronto_cma') }}
), ),
-- Compute percentile ranks for scoring components -- Compute scores
percentiles as ( scored as (
select select
d.neighbourhood_id, ny.neighbourhood_id,
d.neighbourhood_name, ny.neighbourhood_name,
d.geometry, ny.geometry,
d.census_year as year, ny.year,
d.population, cm.population,
d.median_household_income, -- Use CMA-level income (neighbourhood-level not available in Toronto Open Data)
cma.median_household_income,
-- Safety score: inverse of crime rate (higher = safer) -- Safety score: inverse of crime rate (higher = safer)
case case
when c.crime_rate_per_100k is not null when cr.crime_rate_per_100k is not null
then 100 - percent_rank() over ( then 100 - percent_rank() over (
partition by d.census_year partition by ny.year
order by c.crime_rate_per_100k order by cr.crime_rate_per_100k
) * 100 ) * 100
else null else null
end as safety_score, end as safety_score,
-- Affordability score: inverse of rent-to-income ratio -- Affordability score: inverse of rent-to-income ratio
-- Using CMA-level income since neighbourhood-level not available
case case
when h.rent_to_income_pct is not null when cma.median_household_income > 0 and r.avg_rent_standard > 0
then 100 - percent_rank() over ( then 100 - percent_rank() over (
partition by d.census_year partition by ny.year
order by h.rent_to_income_pct order by (r.avg_rent_standard * 12 / cma.median_household_income)
) * 100 ) * 100
else null else null
end as affordability_score, end as affordability_score,
-- Amenity score: based on amenities per capita -- Raw metrics
cr.crime_rate_per_100k,
case case
when a.total_amenities_per_1000 is not null when cma.median_household_income > 0 and r.avg_rent_standard > 0
then percent_rank() over ( then round((r.avg_rent_standard * 12 / cma.median_household_income) * 100, 2)
partition by d.census_year
order by a.total_amenities_per_1000
) * 100
else null else null
end as amenity_score, end as rent_to_income_pct,
r.avg_rent_standard as avg_rent_2bed,
r.vacancy_rate
-- Raw metrics for reference from neighbourhood_years ny
c.crime_rate_per_100k, left join census_mapped cm
h.rent_to_income_pct, on ny.neighbourhood_id = cm.neighbourhood_id
h.avg_rent_2bed, and ny.year = cm.year
a.total_amenities_per_1000 left join cma_census cma
on ny.year = cma.year
from demographics d left join crime cr
left join housing h on ny.neighbourhood_id = cr.neighbourhood_id
on d.neighbourhood_id = h.neighbourhood_id and ny.year = cr.year
and d.census_year = h.year left join rentals r
left join crime c on ny.year = r.year
on d.neighbourhood_id = c.neighbourhood_id
and d.census_year = c.year
left join amenities a
on d.neighbourhood_id = a.neighbourhood_id
and d.census_year = a.year
), ),
final as ( final as (
@@ -88,13 +128,14 @@ final as (
-- Component scores (0-100) -- Component scores (0-100)
round(safety_score::numeric, 1) as safety_score, round(safety_score::numeric, 1) as safety_score,
round(affordability_score::numeric, 1) as affordability_score, round(affordability_score::numeric, 1) as affordability_score,
round(amenity_score::numeric, 1) as amenity_score, -- Amenity score not available at this level, use placeholder
50.0 as amenity_score,
-- Composite livability score: safety (30%), affordability (40%), amenities (30%) -- Composite livability score: safety (40%), affordability (40%), amenities (20%)
round( round(
(coalesce(safety_score, 50) * 0.30 + (coalesce(safety_score, 50) * 0.40 +
coalesce(affordability_score, 50) * 0.40 + coalesce(affordability_score, 50) * 0.40 +
coalesce(amenity_score, 50) * 0.30)::numeric, 50 * 0.20)::numeric,
1 1
) as livability_score, ) as livability_score,
@@ -102,9 +143,10 @@ final as (
crime_rate_per_100k, crime_rate_per_100k,
rent_to_income_pct, rent_to_income_pct,
avg_rent_2bed, avg_rent_2bed,
total_amenities_per_1000 vacancy_rate,
null::numeric as total_amenities_per_1000
from percentiles from scored
) )
select * from final select * from final

View File

@@ -1,9 +1,13 @@
-- Staged CMHC rental market survey data -- Staged CMHC rental market survey data
-- Source: fact_rentals table loaded from CMHC CSV exports -- Source: fact_rentals table loaded from CMHC/StatCan
-- Grain: One row per zone per bedroom type per survey year -- Grain: One row per zone per bedroom type per survey year
with source as ( with source as (
select * from {{ source('toronto_housing', 'fact_rentals') }} select
f.*,
t.year as survey_year
from {{ source('toronto_housing', 'fact_rentals') }} f
join {{ source('toronto_housing', 'dim_time') }} t on f.date_key = t.date_key
), ),
staged as ( staged as (
@@ -11,6 +15,7 @@ staged as (
id as rental_id, id as rental_id,
date_key, date_key,
zone_key, zone_key,
survey_year as year,
bedroom_type, bedroom_type,
universe as rental_universe, universe as rental_universe,
avg_rent, avg_rent,

View File

@@ -1,6 +1,6 @@
services: services:
db: db:
image: postgis/postgis:16-3.4 image: ${POSTGIS_IMAGE:-postgis/postgis:16-3.4}
container_name: portfolio-db container_name: portfolio-db
restart: unless-stopped restart: unless-stopped
ports: ports:

307
docs/DATABASE_SCHEMA.md Normal file
View File

@@ -0,0 +1,307 @@
# Database Schema
This document describes the PostgreSQL/PostGIS database schema for the Toronto Neighbourhood Dashboard.
## Entity Relationship Diagram
```mermaid
erDiagram
dim_time {
int date_key PK
date full_date UK
int year
int month
int quarter
string month_name
bool is_month_start
}
dim_cmhc_zone {
int zone_key PK
string zone_code UK
string zone_name
geometry geometry
}
dim_neighbourhood {
int neighbourhood_id PK
string name
geometry geometry
int population
numeric land_area_sqkm
numeric pop_density_per_sqkm
numeric pct_bachelors_or_higher
numeric median_household_income
numeric pct_owner_occupied
numeric pct_renter_occupied
int census_year
}
dim_policy_event {
int event_id PK
date event_date
date effective_date
string level
string category
string title
text description
string expected_direction
string source_url
string confidence
}
fact_rentals {
int id PK
int date_key FK
int zone_key FK
string bedroom_type
int universe
numeric avg_rent
numeric median_rent
numeric vacancy_rate
numeric availability_rate
numeric turnover_rate
numeric rent_change_pct
string reliability_code
}
fact_census {
int id PK
int neighbourhood_id FK
int census_year
int population
numeric population_density
numeric median_household_income
numeric average_household_income
numeric unemployment_rate
numeric pct_bachelors_or_higher
numeric pct_owner_occupied
numeric pct_renter_occupied
numeric median_age
numeric average_dwelling_value
}
fact_crime {
int id PK
int neighbourhood_id FK
int year
string crime_type
int count
numeric rate_per_100k
}
fact_amenities {
int id PK
int neighbourhood_id FK
string amenity_type
int count
int year
}
bridge_cmhc_neighbourhood {
int id PK
string cmhc_zone_code FK
int neighbourhood_id FK
numeric weight
}
dim_time ||--o{ fact_rentals : "date_key"
dim_cmhc_zone ||--o{ fact_rentals : "zone_key"
dim_neighbourhood ||--o{ fact_census : "neighbourhood_id"
dim_neighbourhood ||--o{ fact_crime : "neighbourhood_id"
dim_neighbourhood ||--o{ fact_amenities : "neighbourhood_id"
dim_cmhc_zone ||--o{ bridge_cmhc_neighbourhood : "zone_code"
dim_neighbourhood ||--o{ bridge_cmhc_neighbourhood : "neighbourhood_id"
```
## Schema Layers
### Raw Schema
Raw data is loaded directly from external sources without transformation:
| Table | Source | Description |
|-------|--------|-------------|
| `raw.neighbourhoods` | City of Toronto API | GeoJSON neighbourhood boundaries |
| `raw.census_profiles` | City of Toronto API | Census profile data |
| `raw.crime_data` | Toronto Police API | Crime statistics by neighbourhood |
| `raw.cmhc_rentals` | CMHC Data Files | Rental market survey data |
### Staging Schema (dbt)
Staging models provide 1:1 cleaned representations of source data:
| Model | Source Table | Purpose |
|-------|-------------|---------|
| `stg_toronto__neighbourhoods` | raw.neighbourhoods | Cleaned boundaries with standardized names |
| `stg_toronto__census` | raw.census_profiles | Typed census metrics |
| `stg_cmhc__rentals` | raw.cmhc_rentals | Validated rental data |
| `stg_police__crimes` | raw.crime_data | Standardized crime categories |
### Marts Schema (dbt)
Analytical tables ready for dashboard consumption:
| Model | Grain | Purpose |
|-------|-------|---------|
| `mart_neighbourhood_summary` | neighbourhood | Composite livability scores |
| `mart_rental_trends` | zone × month | Time-series rental analysis |
| `mart_crime_rates` | neighbourhood × year | Crime rate calculations |
| `mart_amenity_density` | neighbourhood | Amenity accessibility scores |
## Table Details
### Dimension Tables
#### dim_time
Time dimension for date-based analysis. Grain: one row per month.
| Column | Type | Constraints | Description |
|--------|------|-------------|-------------|
| date_key | INTEGER | PK | Surrogate key (YYYYMM format) |
| full_date | DATE | UNIQUE, NOT NULL | First day of month |
| year | INTEGER | NOT NULL | Calendar year |
| month | INTEGER | NOT NULL | Month number (1-12) |
| quarter | INTEGER | NOT NULL | Quarter (1-4) |
| month_name | VARCHAR(20) | NOT NULL | Month name |
| is_month_start | BOOLEAN | DEFAULT TRUE | Always true (monthly grain) |
#### dim_cmhc_zone
CMHC rental market zones (~20 zones covering Toronto).
| Column | Type | Constraints | Description |
|--------|------|-------------|-------------|
| zone_key | INTEGER | PK, AUTO | Surrogate key |
| zone_code | VARCHAR(10) | UNIQUE, NOT NULL | CMHC zone identifier |
| zone_name | VARCHAR(100) | NOT NULL | Zone display name |
| geometry | GEOMETRY(POLYGON) | SRID 4326 | PostGIS zone boundary |
#### dim_neighbourhood
Toronto's 158 official neighbourhoods.
| Column | Type | Constraints | Description |
|--------|------|-------------|-------------|
| neighbourhood_id | INTEGER | PK | City-assigned ID |
| name | VARCHAR(100) | NOT NULL | Neighbourhood name |
| geometry | GEOMETRY(POLYGON) | SRID 4326 | PostGIS boundary |
| population | INTEGER | | Total population |
| land_area_sqkm | NUMERIC(10,4) | | Area in km² |
| pop_density_per_sqkm | NUMERIC(10,2) | | Population density |
| pct_bachelors_or_higher | NUMERIC(5,2) | | Education rate |
| median_household_income | NUMERIC(12,2) | | Median income |
| pct_owner_occupied | NUMERIC(5,2) | | Owner occupancy rate |
| pct_renter_occupied | NUMERIC(5,2) | | Renter occupancy rate |
| census_year | INTEGER | DEFAULT 2021 | Census reference year |
#### dim_policy_event
Policy events for time-series annotation (rent control, interest rates, etc.).
| Column | Type | Constraints | Description |
|--------|------|-------------|-------------|
| event_id | INTEGER | PK, AUTO | Surrogate key |
| event_date | DATE | NOT NULL | Announcement date |
| effective_date | DATE | | Implementation date |
| level | VARCHAR(20) | NOT NULL | federal/provincial/municipal |
| category | VARCHAR(20) | NOT NULL | monetary/tax/regulatory/supply/economic |
| title | VARCHAR(200) | NOT NULL | Event title |
| description | TEXT | | Detailed description |
| expected_direction | VARCHAR(10) | NOT NULL | bearish/bullish/neutral |
| source_url | VARCHAR(500) | | Reference link |
| confidence | VARCHAR(10) | DEFAULT 'medium' | high/medium/low |
### Fact Tables
#### fact_rentals
CMHC rental market survey data. Grain: zone × bedroom type × survey date.
| Column | Type | Constraints | Description |
|--------|------|-------------|-------------|
| id | INTEGER | PK, AUTO | Surrogate key |
| date_key | INTEGER | FK → dim_time | Survey date reference |
| zone_key | INTEGER | FK → dim_cmhc_zone | CMHC zone reference |
| bedroom_type | VARCHAR(20) | NOT NULL | bachelor/1-bed/2-bed/3+bed/total |
| universe | INTEGER | | Total rental units |
| avg_rent | NUMERIC(10,2) | | Average rent |
| median_rent | NUMERIC(10,2) | | Median rent |
| vacancy_rate | NUMERIC(5,2) | | Vacancy percentage |
| availability_rate | NUMERIC(5,2) | | Availability percentage |
| turnover_rate | NUMERIC(5,2) | | Turnover percentage |
| rent_change_pct | NUMERIC(5,2) | | Year-over-year change |
| reliability_code | VARCHAR(2) | | CMHC data quality code |
#### fact_census
Census statistics. Grain: neighbourhood × census year.
| Column | Type | Constraints | Description |
|--------|------|-------------|-------------|
| id | INTEGER | PK, AUTO | Surrogate key |
| neighbourhood_id | INTEGER | FK → dim_neighbourhood | Neighbourhood reference |
| census_year | INTEGER | NOT NULL | 2016, 2021, etc. |
| population | INTEGER | | Total population |
| population_density | NUMERIC(10,2) | | People per km² |
| median_household_income | NUMERIC(12,2) | | Median income |
| average_household_income | NUMERIC(12,2) | | Average income |
| unemployment_rate | NUMERIC(5,2) | | Unemployment % |
| pct_bachelors_or_higher | NUMERIC(5,2) | | Education rate |
| pct_owner_occupied | NUMERIC(5,2) | | Owner rate |
| pct_renter_occupied | NUMERIC(5,2) | | Renter rate |
| median_age | NUMERIC(5,2) | | Median resident age |
| average_dwelling_value | NUMERIC(12,2) | | Average home value |
#### fact_crime
Crime statistics. Grain: neighbourhood × year × crime type.
| Column | Type | Constraints | Description |
|--------|------|-------------|-------------|
| id | INTEGER | PK, AUTO | Surrogate key |
| neighbourhood_id | INTEGER | FK → dim_neighbourhood | Neighbourhood reference |
| year | INTEGER | NOT NULL | Calendar year |
| crime_type | VARCHAR(50) | NOT NULL | Crime category |
| count | INTEGER | NOT NULL | Number of incidents |
| rate_per_100k | NUMERIC(10,2) | | Rate per 100k population |
#### fact_amenities
Amenity counts. Grain: neighbourhood × amenity type × year.
| Column | Type | Constraints | Description |
|--------|------|-------------|-------------|
| id | INTEGER | PK, AUTO | Surrogate key |
| neighbourhood_id | INTEGER | FK → dim_neighbourhood | Neighbourhood reference |
| amenity_type | VARCHAR(50) | NOT NULL | parks/schools/transit/etc. |
| count | INTEGER | NOT NULL | Number of amenities |
| year | INTEGER | NOT NULL | Reference year |
### Bridge Tables
#### bridge_cmhc_neighbourhood
Maps CMHC zones to neighbourhoods with area-based weights for data disaggregation.
| Column | Type | Constraints | Description |
|--------|------|-------------|-------------|
| id | INTEGER | PK, AUTO | Surrogate key |
| cmhc_zone_code | VARCHAR(10) | FK → dim_cmhc_zone | Zone reference |
| neighbourhood_id | INTEGER | FK → dim_neighbourhood | Neighbourhood reference |
| weight | NUMERIC(5,4) | NOT NULL | Proportional weight (0-1) |
## Indexes
| Table | Index | Columns | Purpose |
|-------|-------|---------|---------|
| fact_rentals | ix_fact_rentals_date_zone | date_key, zone_key | Time-series queries |
| fact_census | ix_fact_census_neighbourhood_year | neighbourhood_id, census_year | Census lookups |
| fact_crime | ix_fact_crime_neighbourhood_year | neighbourhood_id, year | Crime trends |
| fact_crime | ix_fact_crime_type | crime_type | Crime filtering |
| fact_amenities | ix_fact_amenities_neighbourhood_year | neighbourhood_id, year | Amenity queries |
| fact_amenities | ix_fact_amenities_type | amenity_type | Amenity filtering |
| bridge_cmhc_neighbourhood | ix_bridge_cmhc_zone | cmhc_zone_code | Zone lookups |
| bridge_cmhc_neighbourhood | ix_bridge_neighbourhood | neighbourhood_id | Neighbourhood lookups |
## PostGIS Extensions
The database requires PostGIS for geospatial operations:
```sql
CREATE EXTENSION IF NOT EXISTS postgis;
```
All geometry columns use SRID 4326 (WGS84) for compatibility with web mapping libraries.

View File

@@ -0,0 +1,200 @@
# Runbook: Adding a New Dashboard
This runbook describes how to add a new data dashboard to the portfolio application.
## Prerequisites
- [ ] Data sources identified and accessible
- [ ] Database schema designed
- [ ] Basic Dash/Plotly familiarity
## Directory Structure
Create the following structure under `portfolio_app/`:
```
portfolio_app/
├── pages/
│ └── {dashboard_name}/
│ ├── dashboard.py # Main layout with tabs
│ ├── methodology.py # Data sources and methods page
│ ├── tabs/
│ │ ├── __init__.py
│ │ ├── overview.py # Overview tab layout
│ │ └── ... # Additional tab layouts
│ └── callbacks/
│ ├── __init__.py
│ └── ... # Callback modules
├── {dashboard_name}/ # Data logic (outside pages/)
│ ├── __init__.py
│ ├── parsers/ # API/CSV extraction
│ │ └── __init__.py
│ ├── loaders/ # Database operations
│ │ └── __init__.py
│ ├── schemas/ # Pydantic models
│ │ └── __init__.py
│ └── models/ # SQLAlchemy ORM
│ └── __init__.py
```
## Step-by-Step Checklist
### 1. Data Layer
- [ ] Create Pydantic schemas in `{dashboard_name}/schemas/`
- [ ] Create SQLAlchemy models in `{dashboard_name}/models/`
- [ ] Create parsers in `{dashboard_name}/parsers/`
- [ ] Create loaders in `{dashboard_name}/loaders/`
- [ ] Add database migrations if needed
### 2. dbt Models
Create dbt models in `dbt/models/`:
- [ ] `staging/stg_{source}__{entity}.sql` - Raw data cleaning
- [ ] `intermediate/int_{domain}__{transform}.sql` - Business logic
- [ ] `marts/mart_{domain}.sql` - Final analytical tables
Follow naming conventions:
- Staging: `stg_{source}__{entity}`
- Intermediate: `int_{domain}__{transform}`
- Marts: `mart_{domain}`
### 3. Visualization Layer
- [ ] Create figure factories in `figures/` (or reuse existing)
- [ ] Follow the factory pattern: `create_{chart_type}_figure(data, **kwargs)`
### 4. Dashboard Pages
#### Main Dashboard (`pages/{dashboard_name}/dashboard.py`)
```python
import dash
from dash import html, dcc
import dash_mantine_components as dmc
dash.register_page(
__name__,
path="/{dashboard_name}",
title="{Dashboard Title}",
description="{Description}"
)
def layout():
return dmc.Container([
# Header
dmc.Title("{Dashboard Title}", order=1),
# Tabs
dmc.Tabs([
dmc.TabsList([
dmc.TabsTab("Overview", value="overview"),
# Add more tabs
]),
dmc.TabsPanel(overview_tab(), value="overview"),
# Add more panels
], value="overview"),
])
```
#### Tab Layouts (`pages/{dashboard_name}/tabs/`)
- [ ] Create one file per tab
- [ ] Export layout function from each
#### Callbacks (`pages/{dashboard_name}/callbacks/`)
- [ ] Create callback modules for interactivity
- [ ] Import and register in dashboard.py
### 5. Navigation
Add to sidebar in `components/sidebar.py`:
```python
dmc.NavLink(
label="{Dashboard Name}",
href="/{dashboard_name}",
icon=DashIconify(icon="..."),
)
```
### 6. Documentation
- [ ] Create methodology page (`pages/{dashboard_name}/methodology.py`)
- [ ] Document data sources
- [ ] Document transformation logic
- [ ] Add notebooks to `notebooks/{dashboard_name}/` if needed
### 7. Testing
- [ ] Add unit tests for parsers
- [ ] Add unit tests for loaders
- [ ] Add integration tests for callbacks
- [ ] Run `make test`
### 8. Final Verification
- [ ] All pages render without errors
- [ ] All callbacks respond correctly
- [ ] Data loads successfully
- [ ] dbt models run cleanly (`make dbt-run`)
- [ ] Linting passes (`make lint`)
- [ ] Tests pass (`make test`)
## Example: Toronto Dashboard
Reference implementation: `portfolio_app/pages/toronto/`
Key files:
- `dashboard.py` - Main layout with 5 tabs
- `tabs/overview.py` - Livability scores, scatter plots
- `callbacks/map_callbacks.py` - Choropleth interactions
- `toronto/models/dimensions.py` - Dimension tables
- `toronto/models/facts.py` - Fact tables
## Common Patterns
### Figure Factories
```python
# figures/choropleth.py
def create_choropleth_figure(
gdf: gpd.GeoDataFrame,
value_column: str,
title: str,
**kwargs
) -> go.Figure:
...
```
### Callbacks
```python
# callbacks/map_callbacks.py
@callback(
Output("neighbourhood-details", "children"),
Input("choropleth-map", "clickData"),
)
def update_details(click_data):
...
```
### Data Loading
```python
# {dashboard_name}/loaders/load.py
def load_data(session: Session) -> None:
# Parse from source
records = parse_source_data()
# Validate with Pydantic
validated = [Schema(**r) for r in records]
# Load to database
for record in validated:
session.add(Model(**record.model_dump()))
session.commit()
```

232
docs/runbooks/deployment.md Normal file
View File

@@ -0,0 +1,232 @@
# Runbook: Deployment
This runbook covers deployment procedures for the Analytics Portfolio application.
## Environments
| Environment | Branch | Server | URL |
|-------------|--------|--------|-----|
| Development | `development` | Local | http://localhost:8050 |
| Staging | `staging` | Homelab (hotserv) | Internal |
| Production | `main` | Bandit Labs VPS | https://leodata.science |
## CI/CD Pipeline
### Automatic Deployment
Deployments are triggered automatically via Gitea Actions:
1. **Push to `staging`** → Deploys to staging server
2. **Push to `main`** → Deploys to production server
### Workflow Files
- `.gitea/workflows/ci.yml` - Runs linting and tests on all branches
- `.gitea/workflows/deploy-staging.yml` - Staging deployment
- `.gitea/workflows/deploy-production.yml` - Production deployment
### Required Secrets
Configure these in Gitea repository settings:
| Secret | Description |
|--------|-------------|
| `STAGING_HOST` | Staging server hostname/IP |
| `STAGING_USER` | SSH username for staging |
| `STAGING_SSH_KEY` | Private key for staging SSH |
| `PROD_HOST` | Production server hostname/IP |
| `PROD_USER` | SSH username for production |
| `PROD_SSH_KEY` | Private key for production SSH |
## Manual Deployment
### Prerequisites
- SSH access to target server
- Repository cloned at `~/apps/personal-portfolio`
- Virtual environment created at `.venv`
- Docker and Docker Compose installed
- PostgreSQL container running
### Steps
```bash
# 1. SSH to server
ssh user@server
# 2. Navigate to app directory
cd ~/apps/personal-portfolio
# 3. Pull latest changes
git fetch origin {branch}
git reset --hard origin/{branch}
# 4. Activate virtual environment
source .venv/bin/activate
# 5. Install dependencies
pip install -r requirements.txt
# 6. Run database migrations (if any)
# python -m alembic upgrade head
# 7. Run dbt models
cd dbt && dbt run --profiles-dir . && cd ..
# 8. Restart application
docker compose down
docker compose up -d
# 9. Verify health
curl http://localhost:8050/health
```
## Rollback Procedure
### Quick Rollback
If deployment fails, rollback to previous commit:
```bash
# 1. Find previous working commit
git log --oneline -10
# 2. Reset to that commit
git reset --hard {commit_hash}
# 3. Restart services
docker compose down
docker compose up -d
# 4. Verify
curl http://localhost:8050/health
```
### Full Rollback (Database)
If database changes need to be reverted:
```bash
# 1. Stop application
docker compose down
# 2. Restore database backup
pg_restore -h localhost -U portfolio -d portfolio backup.dump
# 3. Revert code
git reset --hard {commit_hash}
# 4. Run dbt at that version
cd dbt && dbt run --profiles-dir . && cd ..
# 5. Restart
docker compose up -d
```
## Health Checks
### Application Health
```bash
curl http://localhost:8050/health
```
Expected response:
```json
{"status": "healthy"}
```
### Database Health
```bash
docker compose exec postgres pg_isready -U portfolio
```
### Container Status
```bash
docker compose ps
```
## Monitoring
### View Logs
```bash
# All services
make logs
# Specific service
make logs SERVICE=postgres
# Or directly
docker compose logs -f
```
### Check Resource Usage
```bash
docker stats
```
## Troubleshooting
### Application Won't Start
1. Check container logs: `docker compose logs app`
2. Verify environment variables: `cat .env`
3. Check database connectivity: `docker compose exec postgres pg_isready`
4. Verify port availability: `lsof -i :8050`
### Database Connection Errors
1. Check postgres container: `docker compose ps postgres`
2. Verify DATABASE_URL in `.env`
3. Check postgres logs: `docker compose logs postgres`
4. Test connection: `docker compose exec postgres psql -U portfolio -c '\l'`
### dbt Failures
1. Check dbt logs: `cd dbt && dbt debug`
2. Verify profiles.yml: `cat dbt/profiles.yml`
3. Run with verbose output: `dbt run --debug`
### Out of Memory
1. Check memory usage: `free -h`
2. Review container limits in docker-compose.yml
3. Consider increasing swap or server resources
## Backup Procedures
### Database Backup
```bash
# Create backup
docker compose exec postgres pg_dump -U portfolio portfolio > backup_$(date +%Y%m%d).sql
# Compressed backup
docker compose exec postgres pg_dump -U portfolio -Fc portfolio > backup_$(date +%Y%m%d).dump
```
### Restore from Backup
```bash
# From SQL file
docker compose exec -T postgres psql -U portfolio portfolio < backup.sql
# From dump file
docker compose exec -T postgres pg_restore -U portfolio -d portfolio < backup.dump
```
## Deployment Checklist
Before deploying to production:
- [ ] All tests pass (`make test`)
- [ ] Linting passes (`make lint`)
- [ ] Staging deployment successful
- [ ] Manual testing on staging complete
- [ ] Database backup taken
- [ ] Rollback plan confirmed
- [ ] Team notified of deployment window

View File

@@ -1,6 +1,7 @@
"""Chart callbacks for supporting visualizations.""" """Chart callbacks for supporting visualizations."""
# mypy: disable-error-code="misc,no-untyped-def,arg-type" # mypy: disable-error-code="misc,no-untyped-def,arg-type"
import pandas as pd
import plotly.graph_objects as go import plotly.graph_objects as go
from dash import Input, Output, callback from dash import Input, Output, callback
@@ -43,7 +44,24 @@ def update_overview_scatter(year: str) -> go.Figure:
# Compute safety score (inverse of crime rate) # Compute safety score (inverse of crime rate)
if "total_crime_rate" in merged.columns: if "total_crime_rate" in merged.columns:
max_crime = merged["total_crime_rate"].max() max_crime = merged["total_crime_rate"].max()
merged["safety_score"] = 100 - (merged["total_crime_rate"] / max_crime * 100) if max_crime and max_crime > 0:
merged["safety_score"] = 100 - (
merged["total_crime_rate"] / max_crime * 100
)
else:
merged["safety_score"] = 50 # Default if no crime data
# Fill NULL population with median or default value for sizing
if "population" in merged.columns:
median_pop = merged["population"].median()
default_pop = median_pop if pd.notna(median_pop) else 10000
merged["population"] = merged["population"].fillna(default_pop)
# Filter rows with required data for scatter plot
merged = merged.dropna(subset=["median_household_income", "safety_score"])
if merged.empty:
return _empty_chart("Insufficient data for scatter plot")
data = merged.to_dict("records") data = merged.to_dict("records")
@@ -76,12 +94,13 @@ def update_housing_trend(year: str, neighbourhood_id: int | None) -> go.Figure:
return _empty_chart("No trend data available") return _empty_chart("No trend data available")
# Placeholder for trend data - would be historical # Placeholder for trend data - would be historical
base_rent = averages.get("avg_rent_2bed") or 2000
data = [ data = [
{"year": "2019", "avg_rent": averages.get("avg_rent_2bed", 2000) * 0.85}, {"year": "2019", "avg_rent": base_rent * 0.85},
{"year": "2020", "avg_rent": averages.get("avg_rent_2bed", 2000) * 0.88}, {"year": "2020", "avg_rent": base_rent * 0.88},
{"year": "2021", "avg_rent": averages.get("avg_rent_2bed", 2000) * 0.92}, {"year": "2021", "avg_rent": base_rent * 0.92},
{"year": "2022", "avg_rent": averages.get("avg_rent_2bed", 2000) * 0.96}, {"year": "2022", "avg_rent": base_rent * 0.96},
{"year": "2023", "avg_rent": averages.get("avg_rent_2bed", 2000)}, {"year": "2023", "avg_rent": base_rent},
] ]
fig = go.Figure() fig = go.Figure()
@@ -330,10 +349,11 @@ def update_amenities_radar(year: str, neighbourhood_id: int | None) -> go.Figure
# Get city averages # Get city averages
averages = get_city_averages(year_int) averages = get_city_averages(year_int)
amenity_score = averages.get("avg_amenity_score") or 50
city_data = { city_data = {
"parks_per_1000": averages.get("avg_amenity_score", 50) / 100 * 10, "parks_per_1000": amenity_score / 100 * 10,
"schools_per_1000": averages.get("avg_amenity_score", 50) / 100 * 5, "schools_per_1000": amenity_score / 100 * 5,
"childcare_per_1000": averages.get("avg_amenity_score", 50) / 100 * 3, "childcare_per_1000": amenity_score / 100 * 3,
"transit_access": 70, "transit_access": 70,
} }

View File

@@ -3,7 +3,12 @@
from .amenities import load_amenities, load_amenity_counts from .amenities import load_amenities, load_amenity_counts
from .base import bulk_insert, get_session, upsert_by_key from .base import bulk_insert, get_session, upsert_by_key
from .census import load_census_data from .census import load_census_data
from .cmhc import load_cmhc_record, load_cmhc_rentals from .cmhc import (
ensure_toronto_cma_zone,
load_cmhc_record,
load_cmhc_rentals,
load_statcan_cmhc_data,
)
from .cmhc_crosswalk import ( from .cmhc_crosswalk import (
build_cmhc_neighbourhood_crosswalk, build_cmhc_neighbourhood_crosswalk,
disaggregate_zone_value, disaggregate_zone_value,
@@ -32,6 +37,8 @@ __all__ = [
# Fact loaders # Fact loaders
"load_cmhc_rentals", "load_cmhc_rentals",
"load_cmhc_record", "load_cmhc_record",
"load_statcan_cmhc_data",
"ensure_toronto_cma_zone",
# Phase 3 loaders # Phase 3 loaders
"load_census_data", "load_census_data",
"load_crime_data", "load_crime_data",

View File

@@ -1,5 +1,9 @@
"""Loader for CMHC rental data into fact_rentals.""" """Loader for CMHC rental data into fact_rentals."""
import logging
from datetime import date
from typing import Any
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from portfolio_app.toronto.models import DimCMHCZone, DimTime, FactRentals from portfolio_app.toronto.models import DimCMHCZone, DimTime, FactRentals
@@ -8,6 +12,12 @@ from portfolio_app.toronto.schemas import CMHCAnnualSurvey, CMHCRentalRecord
from .base import get_session, upsert_by_key from .base import get_session, upsert_by_key
from .dimensions import generate_date_key from .dimensions import generate_date_key
logger = logging.getLogger(__name__)
# Toronto CMA zone code for CMA-level data
TORONTO_CMA_ZONE_CODE = "TORCMA"
TORONTO_CMA_ZONE_NAME = "Toronto CMA"
def load_cmhc_rentals( def load_cmhc_rentals(
survey: CMHCAnnualSurvey, survey: CMHCAnnualSurvey,
@@ -135,3 +145,117 @@ def load_cmhc_record(
return _load(session) return _load(session)
with get_session() as sess: with get_session() as sess:
return _load(sess) return _load(sess)
def ensure_toronto_cma_zone(session: Session | None = None) -> int:
"""Ensure Toronto CMA zone exists in dim_cmhc_zone.
Creates the zone if it doesn't exist.
Args:
session: Optional existing session.
Returns:
The zone_key for Toronto CMA.
"""
def _ensure(sess: Session) -> int:
zone = (
sess.query(DimCMHCZone).filter_by(zone_code=TORONTO_CMA_ZONE_CODE).first()
)
if zone:
return int(zone.zone_key)
# Create new zone
new_zone = DimCMHCZone(
zone_code=TORONTO_CMA_ZONE_CODE,
zone_name=TORONTO_CMA_ZONE_NAME,
geometry=None, # CMA-level doesn't need geometry
)
sess.add(new_zone)
sess.flush()
logger.info(f"Created Toronto CMA zone with zone_key={new_zone.zone_key}")
return int(new_zone.zone_key)
if session:
return _ensure(session)
with get_session() as sess:
result = _ensure(sess)
sess.commit()
return result
def load_statcan_cmhc_data(
records: list[Any], # List of CMHCRentalRecord from statcan_cmhc parser
session: Session | None = None,
) -> int:
"""Load CMHC rental data from StatCan parser into fact_rentals.
This function handles CMA-level data from the StatCan API, which provides
aggregate Toronto data rather than zone-level HMIP data.
Args:
records: List of CMHCRentalRecord dataclass instances from statcan_cmhc parser.
session: Optional existing session.
Returns:
Number of records loaded.
"""
from portfolio_app.toronto.parsers.statcan_cmhc import (
CMHCRentalRecord as StatCanRecord,
)
def _load(sess: Session) -> int:
# Ensure Toronto CMA zone exists
zone_key = ensure_toronto_cma_zone(sess)
loaded = 0
for record in records:
if not isinstance(record, StatCanRecord):
logger.warning(f"Skipping invalid record type: {type(record)}")
continue
# Generate date key for this record's survey date
survey_date = date(record.year, record.month, 1)
date_key = generate_date_key(survey_date)
# Verify time dimension exists
time_dim = sess.query(DimTime).filter_by(date_key=date_key).first()
if not time_dim:
logger.warning(
f"Time dimension not found for {survey_date}, skipping record"
)
continue
# Create fact record
fact = FactRentals(
date_key=date_key,
zone_key=zone_key,
bedroom_type=record.bedroom_type,
universe=record.universe,
avg_rent=float(record.avg_rent) if record.avg_rent else None,
median_rent=None, # StatCan doesn't provide median
vacancy_rate=float(record.vacancy_rate)
if record.vacancy_rate
else None,
availability_rate=None,
turnover_rate=None,
rent_change_pct=None,
reliability_code=None,
)
# Upsert
inserted, updated = upsert_by_key(
sess, FactRentals, [fact], ["date_key", "zone_key", "bedroom_type"]
)
loaded += inserted + updated
logger.info(f"Loaded {loaded} CMHC rental records from StatCan")
return loaded
if session:
return _load(session)
with get_session() as sess:
result = _load(sess)
sess.commit()
return result

View File

@@ -0,0 +1,383 @@
"""Parser for CMHC rental data via Statistics Canada API.
Downloads rental market data (average rent, vacancy rates, universe)
from Statistics Canada's Web Data Service.
Data Sources:
- Table 34-10-0127: Vacancy rates
- Table 34-10-0129: Rental universe (total units)
- Table 34-10-0133: Average rent by bedroom type
"""
import contextlib
import io
import logging
import zipfile
from dataclasses import dataclass
from decimal import Decimal
from pathlib import Path
from typing import Any
import httpx
import pandas as pd
logger = logging.getLogger(__name__)
# StatCan Web Data Service endpoints
STATCAN_API_BASE = "https://www150.statcan.gc.ca/t1/wds/rest"
STATCAN_DOWNLOAD_BASE = "https://www150.statcan.gc.ca/n1/tbl/csv"
# CMHC table IDs
CMHC_TABLES = {
"vacancy": "34100127",
"universe": "34100129",
"rent": "34100133",
}
# Toronto CMA identifier in StatCan data
TORONTO_DGUID = "2011S0503535"
TORONTO_GEO_NAME = "Toronto, Ontario"
@dataclass
class CMHCRentalRecord:
"""Rental market record for database loading."""
year: int
month: int # CMHC surveys in October, so month=10
zone_name: str
bedroom_type: str
avg_rent: Decimal | None
vacancy_rate: Decimal | None
universe: int | None
class StatCanCMHCParser:
"""Parser for CMHC rental data from Statistics Canada.
Downloads and processes rental market survey data including:
- Average rents by bedroom type
- Vacancy rates
- Rental universe (total units)
Data is available from 1987 to present, updated annually in January.
"""
BEDROOM_TYPE_MAP = {
"Bachelor units": "bachelor",
"One bedroom units": "1bed",
"Two bedroom units": "2bed",
"Three bedroom units": "3bed",
"Total": "total",
}
STRUCTURE_FILTER = "Apartment structures of six units and over"
def __init__(
self,
cache_dir: Path | None = None,
timeout: float = 60.0,
) -> None:
"""Initialize parser.
Args:
cache_dir: Optional directory for caching downloaded files.
timeout: HTTP request timeout in seconds.
"""
self._cache_dir = cache_dir
self._timeout = timeout
self._client: httpx.Client | None = None
@property
def client(self) -> httpx.Client:
"""Lazy-initialize HTTP client."""
if self._client is None:
self._client = httpx.Client(
timeout=self._timeout,
follow_redirects=True,
)
return self._client
def close(self) -> None:
"""Close HTTP client."""
if self._client is not None:
self._client.close()
self._client = None
def __enter__(self) -> "StatCanCMHCParser":
return self
def __exit__(self, *args: Any) -> None:
self.close()
def _get_download_url(self, table_id: str) -> str:
"""Get CSV download URL for a StatCan table.
Args:
table_id: StatCan table ID (e.g., "34100133").
Returns:
Direct download URL for the CSV zip file.
"""
api_url = f"{STATCAN_API_BASE}/getFullTableDownloadCSV/{table_id}/en"
response = self.client.get(api_url)
response.raise_for_status()
data = response.json()
if data.get("status") != "SUCCESS":
raise ValueError(f"StatCan API error: {data}")
return str(data["object"])
def _download_table(self, table_id: str) -> pd.DataFrame:
"""Download and extract a StatCan table as DataFrame.
Args:
table_id: StatCan table ID.
Returns:
DataFrame with table data.
"""
# Check cache first
if self._cache_dir:
cache_file = self._cache_dir / f"{table_id}.csv"
if cache_file.exists():
logger.debug(f"Loading {table_id} from cache")
return pd.read_csv(cache_file)
# Get download URL and fetch
download_url = self._get_download_url(table_id)
logger.info(f"Downloading StatCan table {table_id}...")
response = self.client.get(download_url)
response.raise_for_status()
# Extract CSV from zip
with zipfile.ZipFile(io.BytesIO(response.content)) as zf:
csv_name = f"{table_id}.csv"
with zf.open(csv_name) as f:
df = pd.read_csv(f)
# Cache if directory specified
if self._cache_dir:
self._cache_dir.mkdir(parents=True, exist_ok=True)
df.to_csv(self._cache_dir / f"{table_id}.csv", index=False)
logger.info(f"Downloaded {len(df)} records from table {table_id}")
return df
def _filter_toronto(self, df: pd.DataFrame) -> pd.DataFrame:
"""Filter DataFrame to Toronto CMA only.
Args:
df: Full StatCan DataFrame.
Returns:
DataFrame filtered to Toronto.
"""
# Try DGUID first, then GEO name
if "DGUID" in df.columns:
toronto_df = df[df["DGUID"] == TORONTO_DGUID]
if len(toronto_df) > 0:
return toronto_df
if "GEO" in df.columns:
return df[df["GEO"] == TORONTO_GEO_NAME]
raise ValueError("Could not identify Toronto data in DataFrame")
def get_vacancy_rates(
self,
years: list[int] | None = None,
) -> dict[int, Decimal]:
"""Fetch Toronto vacancy rates by year.
Args:
years: Optional list of years to filter.
Returns:
Dictionary mapping year to vacancy rate.
"""
df = self._download_table(CMHC_TABLES["vacancy"])
df = self._filter_toronto(df)
# Filter years if specified
if years:
df = df[df["REF_DATE"].isin(years)]
# Extract year -> rate mapping
rates = {}
for _, row in df.iterrows():
year = int(row["REF_DATE"])
value = row.get("VALUE")
if pd.notna(value):
rates[year] = Decimal(str(value))
logger.info(f"Fetched vacancy rates for {len(rates)} years")
return rates
def get_rental_universe(
self,
years: list[int] | None = None,
) -> dict[tuple[int, str], int]:
"""Fetch Toronto rental universe (total units) by year and bedroom type.
Args:
years: Optional list of years to filter.
Returns:
Dictionary mapping (year, bedroom_type) to unit count.
"""
df = self._download_table(CMHC_TABLES["universe"])
df = self._filter_toronto(df)
# Filter to standard apartment structures
if "Type of structure" in df.columns:
df = df[df["Type of structure"] == self.STRUCTURE_FILTER]
if years:
df = df[df["REF_DATE"].isin(years)]
universe = {}
for _, row in df.iterrows():
year = int(row["REF_DATE"])
bedroom_raw = row.get("Type of unit", "Total")
bedroom = self.BEDROOM_TYPE_MAP.get(bedroom_raw, "other")
value = row.get("VALUE")
if pd.notna(value) and value is not None:
universe[(year, bedroom)] = int(str(value))
logger.info(
f"Fetched rental universe for {len(universe)} year/bedroom combinations"
)
return universe
def get_average_rents(
self,
years: list[int] | None = None,
) -> dict[tuple[int, str], Decimal]:
"""Fetch Toronto average rents by year and bedroom type.
Args:
years: Optional list of years to filter.
Returns:
Dictionary mapping (year, bedroom_type) to average rent.
"""
df = self._download_table(CMHC_TABLES["rent"])
df = self._filter_toronto(df)
# Filter to standard apartment structures (most reliable data)
if "Type of structure" in df.columns:
df = df[df["Type of structure"] == self.STRUCTURE_FILTER]
if years:
df = df[df["REF_DATE"].isin(years)]
rents = {}
for _, row in df.iterrows():
year = int(row["REF_DATE"])
bedroom_raw = row.get("Type of unit", "Total")
bedroom = self.BEDROOM_TYPE_MAP.get(bedroom_raw, "other")
value = row.get("VALUE")
if pd.notna(value) and str(value) not in ("F", ".."):
with contextlib.suppress(Exception):
rents[(year, bedroom)] = Decimal(str(value))
logger.info(f"Fetched average rents for {len(rents)} year/bedroom combinations")
return rents
def get_all_rental_data(
self,
start_year: int = 2014,
end_year: int | None = None,
) -> list[CMHCRentalRecord]:
"""Fetch all Toronto rental data and combine into records.
Args:
start_year: First year to include.
end_year: Last year to include (defaults to current year + 1).
Returns:
List of CMHCRentalRecord objects ready for database loading.
"""
import datetime
if end_year is None:
end_year = datetime.date.today().year + 1
years = list(range(start_year, end_year + 1))
logger.info(
f"Fetching CMHC rental data for Toronto ({start_year}-{end_year})..."
)
# Fetch all data types
vacancy_rates = self.get_vacancy_rates(years)
rents = self.get_average_rents(years)
universe = self.get_rental_universe(years)
# Combine into records
records = []
bedroom_types = ["bachelor", "1bed", "2bed", "3bed"]
for year in years:
vacancy = vacancy_rates.get(year)
for bedroom in bedroom_types:
avg_rent = rents.get((year, bedroom))
units = universe.get((year, bedroom))
# Skip if no rent data for this year/bedroom
if avg_rent is None:
continue
records.append(
CMHCRentalRecord(
year=year,
month=10, # CMHC surveys in October
zone_name="Toronto CMA",
bedroom_type=bedroom,
avg_rent=avg_rent,
vacancy_rate=vacancy,
universe=units,
)
)
logger.info(f"Created {len(records)} CMHC rental records")
return records
def fetch_toronto_rental_data(
start_year: int = 2014,
end_year: int | None = None,
cache_dir: Path | None = None,
) -> list[CMHCRentalRecord]:
"""Convenience function to fetch Toronto rental data.
Args:
start_year: First year to include.
end_year: Last year to include.
cache_dir: Optional cache directory.
Returns:
List of CMHCRentalRecord objects.
"""
with StatCanCMHCParser(cache_dir=cache_dir) as parser:
return parser.get_all_rental_data(start_year, end_year)
if __name__ == "__main__":
# Test the parser
logging.basicConfig(level=logging.INFO)
records = fetch_toronto_rental_data(start_year=2020)
print(f"\nFetched {len(records)} records")
print("\nSample records:")
for r in records[:10]:
print(
f" {r.year} {r.bedroom_type}: ${r.avg_rent} rent, {r.vacancy_rate}% vacancy"
)

View File

@@ -6,6 +6,7 @@ from the City of Toronto's Open Data Portal.
API Documentation: https://open.toronto.ca/dataset/ API Documentation: https://open.toronto.ca/dataset/
""" """
import contextlib
import json import json
import logging import logging
from decimal import Decimal from decimal import Decimal
@@ -193,6 +194,9 @@ class TorontoOpenDataParser:
def _fetch_geojson(self, package_id: str) -> dict[str, Any]: def _fetch_geojson(self, package_id: str) -> dict[str, Any]:
"""Fetch GeoJSON data from a package. """Fetch GeoJSON data from a package.
Handles both pure GeoJSON responses and CSV responses with embedded
geometry columns (common in Toronto Open Data).
Args: Args:
package_id: The package/dataset ID. package_id: The package/dataset ID.
@@ -212,16 +216,65 @@ class TorontoOpenDataParser:
response = self.client.get(url) response = self.client.get(url)
response.raise_for_status() response.raise_for_status()
data = response.json()
# Cache the response # Try to parse as JSON first
try:
data = response.json()
# If it's already a valid GeoJSON FeatureCollection, return it
if isinstance(data, dict) and data.get("type") == "FeatureCollection":
if self._cache_dir: if self._cache_dir:
self._cache_dir.mkdir(parents=True, exist_ok=True) self._cache_dir.mkdir(parents=True, exist_ok=True)
cache_file = self._cache_dir / f"{package_id}.geojson" cache_file = self._cache_dir / f"{package_id}.geojson"
with open(cache_file, "w", encoding="utf-8") as f: with open(cache_file, "w", encoding="utf-8") as f:
json.dump(data, f) json.dump(data, f)
return dict(data) return dict(data)
except json.JSONDecodeError:
pass
# If JSON parsing failed, it's likely CSV with embedded geometry
# Parse CSV and convert to GeoJSON FeatureCollection
logger.info("Response is CSV format, converting to GeoJSON...")
import csv
import io
# Increase field size limit for large geometry columns
csv.field_size_limit(10 * 1024 * 1024) # 10 MB
csv_text = response.text
reader = csv.DictReader(io.StringIO(csv_text))
features = []
for row in reader:
# Extract geometry from the 'geometry' column if present
geometry = None
if "geometry" in row and row["geometry"]:
with contextlib.suppress(json.JSONDecodeError):
geometry = json.loads(row["geometry"])
# Build properties from all other columns
properties = {k: v for k, v in row.items() if k != "geometry"}
features.append(
{
"type": "Feature",
"geometry": geometry,
"properties": properties,
}
)
geojson_data: dict[str, Any] = {
"type": "FeatureCollection",
"features": features,
}
# Cache the converted response
if self._cache_dir:
self._cache_dir.mkdir(parents=True, exist_ok=True)
cache_file = self._cache_dir / f"{package_id}.geojson"
with open(cache_file, "w", encoding="utf-8") as f:
json.dump(geojson_data, f)
return geojson_data
def _fetch_csv_as_json(self, package_id: str) -> list[dict[str, Any]]: def _fetch_csv_as_json(self, package_id: str) -> list[dict[str, Any]]:
"""Fetch CSV data as JSON records via CKAN datastore. """Fetch CSV data as JSON records via CKAN datastore.
@@ -282,29 +335,32 @@ class TorontoOpenDataParser:
props = feature.get("properties", {}) props = feature.get("properties", {})
geometry = feature.get("geometry") geometry = feature.get("geometry")
# Extract area_id from various possible property names # Use AREA_SHORT_CODE as the primary ID (1-158 range)
area_id = props.get("AREA_ID") or props.get("area_id") # AREA_ID is a large internal identifier not useful for our schema
if area_id is None: short_code = props.get("AREA_SHORT_CODE") or props.get(
# Try AREA_SHORT_CODE as fallback "area_short_code", ""
short_code = props.get("AREA_SHORT_CODE", "") )
if short_code: if short_code:
# Extract numeric part area_id = int("".join(c for c in str(short_code) if c.isdigit()) or "0")
area_id = int("".join(c for c in short_code if c.isdigit()) or "0") else:
# Fallback to _id (row number) if AREA_SHORT_CODE not available
area_id = int(props.get("_id", 0))
if area_id == 0:
logger.warning(f"Skipping neighbourhood with no valid ID: {props}")
continue
area_name = ( area_name = (
props.get("AREA_NAME") props.get("AREA_NAME")
or props.get("area_name") or props.get("area_name")
or f"Neighbourhood {area_id}" or f"Neighbourhood {area_id}"
) )
area_short_code = props.get("AREA_SHORT_CODE") or props.get(
"area_short_code"
)
records.append( records.append(
NeighbourhoodRecord( NeighbourhoodRecord(
area_id=int(area_id), area_id=area_id,
area_name=str(area_name), area_name=str(area_name),
area_short_code=area_short_code, area_short_code=str(short_code) if short_code else None,
geometry=geometry, geometry=geometry,
) )
) )
@@ -314,17 +370,17 @@ class TorontoOpenDataParser:
# Mapping of indicator names to CensusRecord fields # Mapping of indicator names to CensusRecord fields
# Keys are partial matches (case-insensitive) found in the "Characteristic" column # Keys are partial matches (case-insensitive) found in the "Characteristic" column
# Order matters - first match wins, so more specific patterns come first
# Note: owner/renter counts are raw numbers, not percentages - calculated in dbt
CENSUS_INDICATOR_MAPPING: dict[str, str] = { CENSUS_INDICATOR_MAPPING: dict[str, str] = {
"population, 2021": "population", "population, 2021": "population",
"population, 2016": "population", "population, 2016": "population",
"population density per square kilometre": "population_density", "population density per square kilometre": "population_density",
"median total income of household": "median_household_income", "median total income of households in": "median_household_income",
"average total income of household": "average_household_income", "average total income of households in": "average_household_income",
"unemployment rate": "unemployment_rate", "unemployment rate": "unemployment_rate",
"bachelor's degree or higher": "pct_bachelors_or_higher", "bachelor's degree or higher": "pct_bachelors_or_higher",
"owner": "pct_owner_occupied", "average age": "median_age",
"renter": "pct_renter_occupied",
"median age": "median_age",
"average value of dwellings": "average_dwelling_value", "average value of dwellings": "average_dwelling_value",
} }
@@ -358,17 +414,31 @@ class TorontoOpenDataParser:
logger.info(f"Fetched {len(raw_records)} census profile rows") logger.info(f"Fetched {len(raw_records)} census profile rows")
# Find the characteristic/indicator column name # Find the characteristic/indicator column name
# Prioritize "Characteristic" over "Category" since both may exist
sample_row = raw_records[0] sample_row = raw_records[0]
char_col = None char_col = None
# First try exact match for Characteristic
if "Characteristic" in sample_row:
char_col = "Characteristic"
else:
# Fall back to pattern matching
for col in sample_row: for col in sample_row:
col_lower = col.lower() col_lower = col.lower()
if "characteristic" in col_lower or "category" in col_lower: if "characteristic" in col_lower:
char_col = col
break
# Last resort: try Category
if not char_col:
for col in sample_row:
if "category" in col.lower():
char_col = col char_col = col
break break
if not char_col: if not char_col:
# Try common column names # Try other common column names
for candidate in ["Characteristic", "Category", "Topic", "_id"]: for candidate in ["Topic", "_id"]:
if candidate in sample_row: if candidate in sample_row:
char_col = candidate char_col = candidate
break break

View File

@@ -37,7 +37,7 @@ def get_neighbourhoods_geojson(year: int = 2021) -> dict[str, Any]:
ST_AsGeoJSON(geometry)::json as geom, ST_AsGeoJSON(geometry)::json as geom,
population, population,
livability_score livability_score
FROM mart_neighbourhood_overview FROM public_marts.mart_neighbourhood_overview
WHERE year = :year WHERE year = :year
AND geometry IS NOT NULL AND geometry IS NOT NULL
""" """

View File

@@ -1,5 +1,6 @@
"""Service layer for querying neighbourhood data from dbt marts.""" """Service layer for querying neighbourhood data from dbt marts."""
import logging
from functools import lru_cache from functools import lru_cache
from typing import Any from typing import Any
@@ -8,6 +9,8 @@ from sqlalchemy import text
from portfolio_app.toronto.models import get_engine from portfolio_app.toronto.models import get_engine
logger = logging.getLogger(__name__)
def _execute_query(sql: str, params: dict[str, Any] | None = None) -> pd.DataFrame: def _execute_query(sql: str, params: dict[str, Any] | None = None) -> pd.DataFrame:
"""Execute SQL query and return DataFrame. """Execute SQL query and return DataFrame.
@@ -23,8 +26,10 @@ def _execute_query(sql: str, params: dict[str, Any] | None = None) -> pd.DataFra
engine = get_engine() engine = get_engine()
with engine.connect() as conn: with engine.connect() as conn:
return pd.read_sql(text(sql), conn, params=params) return pd.read_sql(text(sql), conn, params=params)
except Exception: except Exception as e:
# Return empty DataFrame on connection or query error logger.error(f"Query failed: {e}")
logger.debug(f"Failed SQL: {sql}")
logger.debug(f"Params: {params}")
return pd.DataFrame() return pd.DataFrame()
@@ -56,7 +61,7 @@ def get_overview_data(year: int = 2021) -> pd.DataFrame:
rent_to_income_pct, rent_to_income_pct,
avg_rent_2bed, avg_rent_2bed,
total_amenities_per_1000 total_amenities_per_1000
FROM mart_neighbourhood_overview FROM public_marts.mart_neighbourhood_overview
WHERE year = :year WHERE year = :year
ORDER BY livability_score DESC NULLS LAST ORDER BY livability_score DESC NULLS LAST
""" """
@@ -95,7 +100,7 @@ def get_housing_data(year: int = 2021) -> pd.DataFrame:
affordability_index, affordability_index,
rent_yoy_change_pct, rent_yoy_change_pct,
income_quintile income_quintile
FROM mart_neighbourhood_housing FROM public_marts.mart_neighbourhood_housing
WHERE year = :year WHERE year = :year
ORDER BY affordability_index ASC NULLS LAST ORDER BY affordability_index ASC NULLS LAST
""" """
@@ -112,26 +117,22 @@ def get_safety_data(year: int = 2021) -> pd.DataFrame:
Returns: Returns:
DataFrame with columns: neighbourhood_id, neighbourhood_name, DataFrame with columns: neighbourhood_id, neighbourhood_name,
total_crime_rate, violent_crime_rate, property_crime_rate, etc. total_crime_rate, violent_crimes, property_crimes, etc.
""" """
sql = """ sql = """
SELECT SELECT
neighbourhood_id, neighbourhood_id,
neighbourhood_name, neighbourhood_name,
year, year,
total_crimes, total_incidents as total_crimes,
crime_rate_per_100k as total_crime_rate, crime_rate_per_100k as total_crime_rate,
violent_crimes, assault_count + robbery_count + homicide_count as violent_crimes,
violent_crime_rate, break_enter_count + auto_theft_count as property_crimes,
property_crimes, theft_over_count as theft_crimes,
property_crime_rate, crime_yoy_change_pct
theft_crimes, FROM public_marts.mart_neighbourhood_safety
theft_rate,
crime_yoy_change_pct,
crime_trend
FROM mart_neighbourhood_safety
WHERE year = :year WHERE year = :year
ORDER BY total_crime_rate ASC NULLS LAST ORDER BY crime_rate_per_100k ASC NULLS LAST
""" """
return _execute_query(sql, {"year": year}) return _execute_query(sql, {"year": year})
@@ -152,22 +153,22 @@ def get_demographics_data(year: int = 2021) -> pd.DataFrame:
SELECT SELECT
neighbourhood_id, neighbourhood_id,
neighbourhood_name, neighbourhood_name,
census_year as year, year,
population, population,
population_density, population_density,
population_change_pct,
median_household_income, median_household_income,
average_household_income, average_household_income,
income_quintile, income_quintile,
income_index,
median_age, median_age,
pct_under_18, age_index,
pct_18_to_64, pct_owner_occupied,
pct_65_plus, pct_renter_occupied,
pct_bachelors_or_higher, education_bachelors_pct as pct_bachelors_or_higher,
unemployment_rate, unemployment_rate,
diversity_index tenure_diversity_index as diversity_index
FROM mart_neighbourhood_demographics FROM public_marts.mart_neighbourhood_demographics
WHERE census_year = :year WHERE year = :year
ORDER BY population DESC NULLS LAST ORDER BY population DESC NULLS LAST
""" """
return _execute_query(sql, {"year": year}) return _execute_query(sql, {"year": year})
@@ -183,26 +184,26 @@ def get_amenities_data(year: int = 2021) -> pd.DataFrame:
Returns: Returns:
DataFrame with columns: neighbourhood_id, neighbourhood_name, DataFrame with columns: neighbourhood_id, neighbourhood_name,
amenity_score, parks_per_capita, schools_per_capita, transit_score, etc. amenity_score, parks_per_1000, schools_per_1000, etc.
""" """
sql = """ sql = """
SELECT SELECT
neighbourhood_id, neighbourhood_id,
neighbourhood_name, neighbourhood_name,
year, year,
park_count, parks_count as park_count,
parks_per_1000, parks_per_1000,
school_count, schools_count as school_count,
schools_per_1000, schools_per_1000,
childcare_count, transit_count as childcare_count,
childcare_per_1000, transit_per_1000 as childcare_per_1000,
total_amenities, total_amenities,
total_amenities_per_1000, total_amenities_per_1000,
amenity_score, amenity_index as amenity_score,
amenity_rank amenity_tier as amenity_rank
FROM mart_neighbourhood_amenities FROM public_marts.mart_neighbourhood_amenities
WHERE year = :year WHERE year = :year
ORDER BY amenity_score DESC NULLS LAST ORDER BY amenity_index DESC NULLS LAST
""" """
return _execute_query(sql, {"year": year}) return _execute_query(sql, {"year": year})
@@ -249,17 +250,17 @@ def get_neighbourhood_details(
a.park_count, a.park_count,
a.school_count, a.school_count,
a.total_amenities a.total_amenities
FROM mart_neighbourhood_overview o FROM public_marts.mart_neighbourhood_overview o
LEFT JOIN mart_neighbourhood_safety s LEFT JOIN public_marts.mart_neighbourhood_safety s
ON o.neighbourhood_id = s.neighbourhood_id ON o.neighbourhood_id = s.neighbourhood_id
AND o.year = s.year AND o.year = s.year
LEFT JOIN mart_neighbourhood_housing h LEFT JOIN public_marts.mart_neighbourhood_housing h
ON o.neighbourhood_id = h.neighbourhood_id ON o.neighbourhood_id = h.neighbourhood_id
AND o.year = h.year AND o.year = h.year
LEFT JOIN mart_neighbourhood_demographics d LEFT JOIN public_marts.mart_neighbourhood_demographics d
ON o.neighbourhood_id = d.neighbourhood_id ON o.neighbourhood_id = d.neighbourhood_id
AND o.year = d.census_year AND o.year = d.census_year
LEFT JOIN mart_neighbourhood_amenities a LEFT JOIN public_marts.mart_neighbourhood_amenities a
ON o.neighbourhood_id = a.neighbourhood_id ON o.neighbourhood_id = a.neighbourhood_id
AND o.year = a.year AND o.year = a.year
WHERE o.neighbourhood_id = :neighbourhood_id WHERE o.neighbourhood_id = :neighbourhood_id
@@ -288,7 +289,7 @@ def get_neighbourhood_list(year: int = 2021) -> list[dict[str, Any]]:
neighbourhood_id, neighbourhood_id,
neighbourhood_name, neighbourhood_name,
population population
FROM mart_neighbourhood_overview FROM public_marts.mart_neighbourhood_overview
WHERE year = :year WHERE year = :year
ORDER BY neighbourhood_name ORDER BY neighbourhood_name
""" """
@@ -317,19 +318,19 @@ def get_rankings(
""" """
# Map metrics to their source tables # Map metrics to their source tables
table_map = { table_map = {
"livability_score": "mart_neighbourhood_overview", "livability_score": "public_marts.mart_neighbourhood_overview",
"safety_score": "mart_neighbourhood_overview", "safety_score": "public_marts.mart_neighbourhood_overview",
"affordability_score": "mart_neighbourhood_overview", "affordability_score": "public_marts.mart_neighbourhood_overview",
"amenity_score": "mart_neighbourhood_overview", "amenity_score": "public_marts.mart_neighbourhood_overview",
"crime_rate_per_100k": "mart_neighbourhood_safety", "crime_rate_per_100k": "public_marts.mart_neighbourhood_safety",
"total_crime_rate": "mart_neighbourhood_safety", "total_crime_rate": "public_marts.mart_neighbourhood_safety",
"avg_rent_2bed": "mart_neighbourhood_housing", "avg_rent_2bed": "public_marts.mart_neighbourhood_housing",
"affordability_index": "mart_neighbourhood_housing", "affordability_index": "public_marts.mart_neighbourhood_housing",
"population": "mart_neighbourhood_demographics", "population": "public_marts.mart_neighbourhood_demographics",
"median_household_income": "mart_neighbourhood_demographics", "median_household_income": "public_marts.mart_neighbourhood_demographics",
} }
table = table_map.get(metric, "mart_neighbourhood_overview") table = table_map.get(metric, "public_marts.mart_neighbourhood_overview")
year_col = "census_year" if "demographics" in table else "year" year_col = "census_year" if "demographics" in table else "year"
order = "ASC" if ascending else "DESC" order = "ASC" if ascending else "DESC"
@@ -375,7 +376,7 @@ def get_city_averages(year: int = 2021) -> dict[str, Any]:
AVG(crime_rate_per_100k) as avg_crime_rate, AVG(crime_rate_per_100k) as avg_crime_rate,
AVG(avg_rent_2bed) as avg_rent_2bed, AVG(avg_rent_2bed) as avg_rent_2bed,
AVG(rent_to_income_pct) as avg_rent_to_income AVG(rent_to_income_pct) as avg_rent_to_income
FROM mart_neighbourhood_overview FROM public_marts.mart_neighbourhood_overview
WHERE year = :year WHERE year = :year
""" """
df = _execute_query(sql, {"year": year}) df = _execute_query(sql, {"year": year})

View File

@@ -38,12 +38,16 @@ from portfolio_app.toronto.loaders import ( # noqa: E402
load_census_data, load_census_data,
load_crime_data, load_crime_data,
load_neighbourhoods, load_neighbourhoods,
load_statcan_cmhc_data,
load_time_dimension, load_time_dimension,
) )
from portfolio_app.toronto.parsers import ( # noqa: E402 from portfolio_app.toronto.parsers import ( # noqa: E402
TorontoOpenDataParser, TorontoOpenDataParser,
TorontoPoliceParser, TorontoPoliceParser,
) )
from portfolio_app.toronto.parsers.statcan_cmhc import ( # noqa: E402
fetch_toronto_rental_data,
)
from portfolio_app.toronto.schemas import Neighbourhood # noqa: E402 from portfolio_app.toronto.schemas import Neighbourhood # noqa: E402
# Configure logging # Configure logging
@@ -91,6 +95,9 @@ class DataPipeline:
# 5. Load amenities # 5. Load amenities
self._load_amenities(session) self._load_amenities(session)
# 6. Load CMHC rental data from StatCan
self._load_rentals(session)
session.commit() session.commit()
logger.info("All data committed to database") logger.info("All data committed to database")
@@ -241,6 +248,32 @@ class DataPipeline:
self.stats["amenities"] = total_count self.stats["amenities"] = total_count
def _load_rentals(self, session: Any) -> None:
"""Fetch and load CMHC rental data from StatCan."""
logger.info("Fetching CMHC rental data from Statistics Canada...")
if self.dry_run:
logger.info(" [DRY RUN] Would fetch and load CMHC rental data")
return
try:
# Fetch rental data (2014-present)
rental_records = fetch_toronto_rental_data(start_year=2014)
if not rental_records:
logger.warning(" No rental records fetched")
return
count = load_statcan_cmhc_data(rental_records, session)
self.stats["rentals"] = count
logger.info(f" Loaded {count} CMHC rental records")
except Exception as e:
logger.warning(f" Failed to load CMHC rental data: {e}")
if self.verbose:
import traceback
traceback.print_exc()
def run_dbt(self) -> bool: def run_dbt(self) -> bool:
"""Run dbt to transform data. """Run dbt to transform data.

View File

@@ -25,8 +25,10 @@ def main() -> int:
engine = get_engine() engine = get_engine()
# Test connection # Test connection
from sqlalchemy import text
with engine.connect() as conn: with engine.connect() as conn:
result = conn.execute("SELECT 1") result = conn.execute(text("SELECT 1"))
result.fetchone() result.fetchone()
print("Database connection successful") print("Database connection successful")

72
scripts/etl/toronto.sh Executable file
View File

@@ -0,0 +1,72 @@
#!/usr/bin/env bash
# scripts/etl/toronto.sh - Run Toronto data pipeline
#
# Usage:
# ./scripts/etl/toronto.sh --full # Complete reload of all data
# ./scripts/etl/toronto.sh --incremental # Only new data since last run
# ./scripts/etl/toronto.sh # Default: incremental
#
# Logs are written to .dev/logs/etl/
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)"
LOG_DIR="$PROJECT_ROOT/.dev/logs/etl"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
LOG_FILE="$LOG_DIR/toronto_${TIMESTAMP}.log"
MODE="${1:---incremental}"
mkdir -p "$LOG_DIR"
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}
log "Starting Toronto ETL pipeline (mode: $MODE)"
log "Log file: $LOG_FILE"
cd "$PROJECT_ROOT"
# Activate virtual environment if it exists
if [ -d ".venv" ]; then
source .venv/bin/activate
log "Activated virtual environment"
fi
case "$MODE" in
--full)
log "Running FULL data reload..."
log "Step 1/4: Parsing neighbourhood data..."
python -m portfolio_app.toronto.parsers.neighbourhoods 2>&1 | tee -a "$LOG_FILE"
log "Step 2/4: Parsing census data..."
python -m portfolio_app.toronto.parsers.census 2>&1 | tee -a "$LOG_FILE"
log "Step 3/4: Parsing crime data..."
python -m portfolio_app.toronto.parsers.crime 2>&1 | tee -a "$LOG_FILE"
log "Step 4/4: Running dbt transformations..."
cd dbt && dbt run --full-refresh --profiles-dir . 2>&1 | tee -a "$LOG_FILE" && cd ..
;;
--incremental)
log "Running INCREMENTAL update..."
log "Step 1/2: Checking for new data..."
# Add incremental logic here when implemented
log "Step 2/2: Running dbt transformations..."
cd dbt && dbt run --profiles-dir . 2>&1 | tee -a "$LOG_FILE" && cd ..
;;
*)
log "ERROR: Unknown mode '$MODE'. Use --full or --incremental"
exit 1
;;
esac
log "Toronto ETL pipeline completed successfully"
log "Full log available at: $LOG_FILE"

20
scripts/logs.sh Executable file
View File

@@ -0,0 +1,20 @@
#!/usr/bin/env bash
# scripts/logs.sh - Follow docker compose logs
#
# Usage:
# ./scripts/logs.sh # All services
# ./scripts/logs.sh postgres # Specific service
# ./scripts/logs.sh -n 100 # Last 100 lines
set -euo pipefail
SERVICE="${1:-}"
EXTRA_ARGS="${@:2}"
if [[ -n "$SERVICE" && "$SERVICE" != -* ]]; then
echo "Following logs for service: $SERVICE"
docker compose logs -f "$SERVICE" $EXTRA_ARGS
else
echo "Following logs for all services"
docker compose logs -f $@
fi

38
scripts/run-detached.sh Executable file
View File

@@ -0,0 +1,38 @@
#!/usr/bin/env bash
# scripts/run-detached.sh - Start containers and wait for health
#
# Usage:
# ./scripts/run-detached.sh
set -euo pipefail
TIMEOUT=60
INTERVAL=5
echo "Starting containers in detached mode..."
docker compose up -d
echo "Waiting for services to become healthy..."
elapsed=0
while [ $elapsed -lt $TIMEOUT ]; do
# Check if postgres is ready
if docker compose exec -T postgres pg_isready -U portfolio > /dev/null 2>&1; then
echo "PostgreSQL is ready!"
# Check if app health endpoint responds (if running)
if curl -sf http://localhost:8050/health > /dev/null 2>&1; then
echo "Application health check passed!"
echo "All services are healthy."
exit 0
fi
fi
echo "Waiting... ($elapsed/$TIMEOUT seconds)"
sleep $INTERVAL
elapsed=$((elapsed + INTERVAL))
done
echo "ERROR: Health check timed out after $TIMEOUT seconds"
docker compose ps
exit 1