fix: Repair data pipeline with StatCan CMHC rental data

- Add StatCan CMHC parser to fetch rental data from Statistics Canada API
- Create year spine (2014-2025) as time dimension driver instead of census
- Add CMA-level rental and income intermediate models
- Update mart_neighbourhood_overview to use rental years as base
- Fix neighbourhood_service queries to match dbt schema
- Add CMHC data loading to pipeline script

Data now flows correctly: 158 neighbourhoods × 12 years = 1,896 records
Rent data available 2019-2025, crime data 2014-2024

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-17 15:38:31 -05:00
parent 4818c53fd2
commit d0f32edba7
21 changed files with 955 additions and 156 deletions

View File

@@ -0,0 +1,383 @@
"""Parser for CMHC rental data via Statistics Canada API.
Downloads rental market data (average rent, vacancy rates, universe)
from Statistics Canada's Web Data Service.
Data Sources:
- Table 34-10-0127: Vacancy rates
- Table 34-10-0129: Rental universe (total units)
- Table 34-10-0133: Average rent by bedroom type
"""
import contextlib
import io
import logging
import zipfile
from dataclasses import dataclass
from decimal import Decimal
from pathlib import Path
from typing import Any
import httpx
import pandas as pd
logger = logging.getLogger(__name__)
# StatCan Web Data Service endpoints
STATCAN_API_BASE = "https://www150.statcan.gc.ca/t1/wds/rest"
STATCAN_DOWNLOAD_BASE = "https://www150.statcan.gc.ca/n1/tbl/csv"
# CMHC table IDs
CMHC_TABLES = {
"vacancy": "34100127",
"universe": "34100129",
"rent": "34100133",
}
# Toronto CMA identifier in StatCan data
TORONTO_DGUID = "2011S0503535"
TORONTO_GEO_NAME = "Toronto, Ontario"
@dataclass
class CMHCRentalRecord:
"""Rental market record for database loading."""
year: int
month: int # CMHC surveys in October, so month=10
zone_name: str
bedroom_type: str
avg_rent: Decimal | None
vacancy_rate: Decimal | None
universe: int | None
class StatCanCMHCParser:
"""Parser for CMHC rental data from Statistics Canada.
Downloads and processes rental market survey data including:
- Average rents by bedroom type
- Vacancy rates
- Rental universe (total units)
Data is available from 1987 to present, updated annually in January.
"""
BEDROOM_TYPE_MAP = {
"Bachelor units": "bachelor",
"One bedroom units": "1bed",
"Two bedroom units": "2bed",
"Three bedroom units": "3bed",
"Total": "total",
}
STRUCTURE_FILTER = "Apartment structures of six units and over"
def __init__(
self,
cache_dir: Path | None = None,
timeout: float = 60.0,
) -> None:
"""Initialize parser.
Args:
cache_dir: Optional directory for caching downloaded files.
timeout: HTTP request timeout in seconds.
"""
self._cache_dir = cache_dir
self._timeout = timeout
self._client: httpx.Client | None = None
@property
def client(self) -> httpx.Client:
"""Lazy-initialize HTTP client."""
if self._client is None:
self._client = httpx.Client(
timeout=self._timeout,
follow_redirects=True,
)
return self._client
def close(self) -> None:
"""Close HTTP client."""
if self._client is not None:
self._client.close()
self._client = None
def __enter__(self) -> "StatCanCMHCParser":
return self
def __exit__(self, *args: Any) -> None:
self.close()
def _get_download_url(self, table_id: str) -> str:
"""Get CSV download URL for a StatCan table.
Args:
table_id: StatCan table ID (e.g., "34100133").
Returns:
Direct download URL for the CSV zip file.
"""
api_url = f"{STATCAN_API_BASE}/getFullTableDownloadCSV/{table_id}/en"
response = self.client.get(api_url)
response.raise_for_status()
data = response.json()
if data.get("status") != "SUCCESS":
raise ValueError(f"StatCan API error: {data}")
return str(data["object"])
def _download_table(self, table_id: str) -> pd.DataFrame:
"""Download and extract a StatCan table as DataFrame.
Args:
table_id: StatCan table ID.
Returns:
DataFrame with table data.
"""
# Check cache first
if self._cache_dir:
cache_file = self._cache_dir / f"{table_id}.csv"
if cache_file.exists():
logger.debug(f"Loading {table_id} from cache")
return pd.read_csv(cache_file)
# Get download URL and fetch
download_url = self._get_download_url(table_id)
logger.info(f"Downloading StatCan table {table_id}...")
response = self.client.get(download_url)
response.raise_for_status()
# Extract CSV from zip
with zipfile.ZipFile(io.BytesIO(response.content)) as zf:
csv_name = f"{table_id}.csv"
with zf.open(csv_name) as f:
df = pd.read_csv(f)
# Cache if directory specified
if self._cache_dir:
self._cache_dir.mkdir(parents=True, exist_ok=True)
df.to_csv(self._cache_dir / f"{table_id}.csv", index=False)
logger.info(f"Downloaded {len(df)} records from table {table_id}")
return df
def _filter_toronto(self, df: pd.DataFrame) -> pd.DataFrame:
"""Filter DataFrame to Toronto CMA only.
Args:
df: Full StatCan DataFrame.
Returns:
DataFrame filtered to Toronto.
"""
# Try DGUID first, then GEO name
if "DGUID" in df.columns:
toronto_df = df[df["DGUID"] == TORONTO_DGUID]
if len(toronto_df) > 0:
return toronto_df
if "GEO" in df.columns:
return df[df["GEO"] == TORONTO_GEO_NAME]
raise ValueError("Could not identify Toronto data in DataFrame")
def get_vacancy_rates(
self,
years: list[int] | None = None,
) -> dict[int, Decimal]:
"""Fetch Toronto vacancy rates by year.
Args:
years: Optional list of years to filter.
Returns:
Dictionary mapping year to vacancy rate.
"""
df = self._download_table(CMHC_TABLES["vacancy"])
df = self._filter_toronto(df)
# Filter years if specified
if years:
df = df[df["REF_DATE"].isin(years)]
# Extract year -> rate mapping
rates = {}
for _, row in df.iterrows():
year = int(row["REF_DATE"])
value = row.get("VALUE")
if pd.notna(value):
rates[year] = Decimal(str(value))
logger.info(f"Fetched vacancy rates for {len(rates)} years")
return rates
def get_rental_universe(
self,
years: list[int] | None = None,
) -> dict[tuple[int, str], int]:
"""Fetch Toronto rental universe (total units) by year and bedroom type.
Args:
years: Optional list of years to filter.
Returns:
Dictionary mapping (year, bedroom_type) to unit count.
"""
df = self._download_table(CMHC_TABLES["universe"])
df = self._filter_toronto(df)
# Filter to standard apartment structures
if "Type of structure" in df.columns:
df = df[df["Type of structure"] == self.STRUCTURE_FILTER]
if years:
df = df[df["REF_DATE"].isin(years)]
universe = {}
for _, row in df.iterrows():
year = int(row["REF_DATE"])
bedroom_raw = row.get("Type of unit", "Total")
bedroom = self.BEDROOM_TYPE_MAP.get(bedroom_raw, "other")
value = row.get("VALUE")
if pd.notna(value) and value is not None:
universe[(year, bedroom)] = int(str(value))
logger.info(
f"Fetched rental universe for {len(universe)} year/bedroom combinations"
)
return universe
def get_average_rents(
self,
years: list[int] | None = None,
) -> dict[tuple[int, str], Decimal]:
"""Fetch Toronto average rents by year and bedroom type.
Args:
years: Optional list of years to filter.
Returns:
Dictionary mapping (year, bedroom_type) to average rent.
"""
df = self._download_table(CMHC_TABLES["rent"])
df = self._filter_toronto(df)
# Filter to standard apartment structures (most reliable data)
if "Type of structure" in df.columns:
df = df[df["Type of structure"] == self.STRUCTURE_FILTER]
if years:
df = df[df["REF_DATE"].isin(years)]
rents = {}
for _, row in df.iterrows():
year = int(row["REF_DATE"])
bedroom_raw = row.get("Type of unit", "Total")
bedroom = self.BEDROOM_TYPE_MAP.get(bedroom_raw, "other")
value = row.get("VALUE")
if pd.notna(value) and str(value) not in ("F", ".."):
with contextlib.suppress(Exception):
rents[(year, bedroom)] = Decimal(str(value))
logger.info(f"Fetched average rents for {len(rents)} year/bedroom combinations")
return rents
def get_all_rental_data(
self,
start_year: int = 2014,
end_year: int | None = None,
) -> list[CMHCRentalRecord]:
"""Fetch all Toronto rental data and combine into records.
Args:
start_year: First year to include.
end_year: Last year to include (defaults to current year + 1).
Returns:
List of CMHCRentalRecord objects ready for database loading.
"""
import datetime
if end_year is None:
end_year = datetime.date.today().year + 1
years = list(range(start_year, end_year + 1))
logger.info(
f"Fetching CMHC rental data for Toronto ({start_year}-{end_year})..."
)
# Fetch all data types
vacancy_rates = self.get_vacancy_rates(years)
rents = self.get_average_rents(years)
universe = self.get_rental_universe(years)
# Combine into records
records = []
bedroom_types = ["bachelor", "1bed", "2bed", "3bed"]
for year in years:
vacancy = vacancy_rates.get(year)
for bedroom in bedroom_types:
avg_rent = rents.get((year, bedroom))
units = universe.get((year, bedroom))
# Skip if no rent data for this year/bedroom
if avg_rent is None:
continue
records.append(
CMHCRentalRecord(
year=year,
month=10, # CMHC surveys in October
zone_name="Toronto CMA",
bedroom_type=bedroom,
avg_rent=avg_rent,
vacancy_rate=vacancy,
universe=units,
)
)
logger.info(f"Created {len(records)} CMHC rental records")
return records
def fetch_toronto_rental_data(
start_year: int = 2014,
end_year: int | None = None,
cache_dir: Path | None = None,
) -> list[CMHCRentalRecord]:
"""Convenience function to fetch Toronto rental data.
Args:
start_year: First year to include.
end_year: Last year to include.
cache_dir: Optional cache directory.
Returns:
List of CMHCRentalRecord objects.
"""
with StatCanCMHCParser(cache_dir=cache_dir) as parser:
return parser.get_all_rental_data(start_year, end_year)
if __name__ == "__main__":
# Test the parser
logging.basicConfig(level=logging.INFO)
records = fetch_toronto_rental_data(start_year=2020)
print(f"\nFetched {len(records)} records")
print("\nSample records:")
for r in records[:10]:
print(
f" {r.year} {r.bedroom_type}: ${r.avg_rent} rent, {r.vacancy_rate}% vacancy"
)

View File

@@ -6,6 +6,7 @@ from the City of Toronto's Open Data Portal.
API Documentation: https://open.toronto.ca/dataset/
"""
import contextlib
import json
import logging
from decimal import Decimal
@@ -193,6 +194,9 @@ class TorontoOpenDataParser:
def _fetch_geojson(self, package_id: str) -> dict[str, Any]:
"""Fetch GeoJSON data from a package.
Handles both pure GeoJSON responses and CSV responses with embedded
geometry columns (common in Toronto Open Data).
Args:
package_id: The package/dataset ID.
@@ -212,16 +216,65 @@ class TorontoOpenDataParser:
response = self.client.get(url)
response.raise_for_status()
data = response.json()
# Cache the response
# Try to parse as JSON first
try:
data = response.json()
# If it's already a valid GeoJSON FeatureCollection, return it
if isinstance(data, dict) and data.get("type") == "FeatureCollection":
if self._cache_dir:
self._cache_dir.mkdir(parents=True, exist_ok=True)
cache_file = self._cache_dir / f"{package_id}.geojson"
with open(cache_file, "w", encoding="utf-8") as f:
json.dump(data, f)
return dict(data)
except json.JSONDecodeError:
pass
# If JSON parsing failed, it's likely CSV with embedded geometry
# Parse CSV and convert to GeoJSON FeatureCollection
logger.info("Response is CSV format, converting to GeoJSON...")
import csv
import io
# Increase field size limit for large geometry columns
csv.field_size_limit(10 * 1024 * 1024) # 10 MB
csv_text = response.text
reader = csv.DictReader(io.StringIO(csv_text))
features = []
for row in reader:
# Extract geometry from the 'geometry' column if present
geometry = None
if "geometry" in row and row["geometry"]:
with contextlib.suppress(json.JSONDecodeError):
geometry = json.loads(row["geometry"])
# Build properties from all other columns
properties = {k: v for k, v in row.items() if k != "geometry"}
features.append(
{
"type": "Feature",
"geometry": geometry,
"properties": properties,
}
)
geojson_data: dict[str, Any] = {
"type": "FeatureCollection",
"features": features,
}
# Cache the converted response
if self._cache_dir:
self._cache_dir.mkdir(parents=True, exist_ok=True)
cache_file = self._cache_dir / f"{package_id}.geojson"
with open(cache_file, "w", encoding="utf-8") as f:
json.dump(data, f)
json.dump(geojson_data, f)
return dict(data)
return geojson_data
def _fetch_csv_as_json(self, package_id: str) -> list[dict[str, Any]]:
"""Fetch CSV data as JSON records via CKAN datastore.
@@ -282,29 +335,32 @@ class TorontoOpenDataParser:
props = feature.get("properties", {})
geometry = feature.get("geometry")
# Extract area_id from various possible property names
area_id = props.get("AREA_ID") or props.get("area_id")
if area_id is None:
# Try AREA_SHORT_CODE as fallback
short_code = props.get("AREA_SHORT_CODE", "")
if short_code:
# Extract numeric part
area_id = int("".join(c for c in short_code if c.isdigit()) or "0")
# Use AREA_SHORT_CODE as the primary ID (1-158 range)
# AREA_ID is a large internal identifier not useful for our schema
short_code = props.get("AREA_SHORT_CODE") or props.get(
"area_short_code", ""
)
if short_code:
area_id = int("".join(c for c in str(short_code) if c.isdigit()) or "0")
else:
# Fallback to _id (row number) if AREA_SHORT_CODE not available
area_id = int(props.get("_id", 0))
if area_id == 0:
logger.warning(f"Skipping neighbourhood with no valid ID: {props}")
continue
area_name = (
props.get("AREA_NAME")
or props.get("area_name")
or f"Neighbourhood {area_id}"
)
area_short_code = props.get("AREA_SHORT_CODE") or props.get(
"area_short_code"
)
records.append(
NeighbourhoodRecord(
area_id=int(area_id),
area_id=area_id,
area_name=str(area_name),
area_short_code=area_short_code,
area_short_code=str(short_code) if short_code else None,
geometry=geometry,
)
)
@@ -314,17 +370,17 @@ class TorontoOpenDataParser:
# Mapping of indicator names to CensusRecord fields
# Keys are partial matches (case-insensitive) found in the "Characteristic" column
# Order matters - first match wins, so more specific patterns come first
# Note: owner/renter counts are raw numbers, not percentages - calculated in dbt
CENSUS_INDICATOR_MAPPING: dict[str, str] = {
"population, 2021": "population",
"population, 2016": "population",
"population density per square kilometre": "population_density",
"median total income of household": "median_household_income",
"average total income of household": "average_household_income",
"median total income of households in": "median_household_income",
"average total income of households in": "average_household_income",
"unemployment rate": "unemployment_rate",
"bachelor's degree or higher": "pct_bachelors_or_higher",
"owner": "pct_owner_occupied",
"renter": "pct_renter_occupied",
"median age": "median_age",
"average age": "median_age",
"average value of dwellings": "average_dwelling_value",
}
@@ -358,17 +414,31 @@ class TorontoOpenDataParser:
logger.info(f"Fetched {len(raw_records)} census profile rows")
# Find the characteristic/indicator column name
# Prioritize "Characteristic" over "Category" since both may exist
sample_row = raw_records[0]
char_col = None
for col in sample_row:
col_lower = col.lower()
if "characteristic" in col_lower or "category" in col_lower:
char_col = col
break
# First try exact match for Characteristic
if "Characteristic" in sample_row:
char_col = "Characteristic"
else:
# Fall back to pattern matching
for col in sample_row:
col_lower = col.lower()
if "characteristic" in col_lower:
char_col = col
break
# Last resort: try Category
if not char_col:
for col in sample_row:
if "category" in col.lower():
char_col = col
break
if not char_col:
# Try common column names
for candidate in ["Characteristic", "Category", "Topic", "_id"]:
# Try other common column names
for candidate in ["Topic", "_id"]:
if candidate in sample_row:
char_col = candidate
break