- Add StatCan CMHC parser to fetch rental data from Statistics Canada API - Create year spine (2014-2025) as time dimension driver instead of census - Add CMA-level rental and income intermediate models - Update mart_neighbourhood_overview to use rental years as base - Fix neighbourhood_service queries to match dbt schema - Add CMHC data loading to pipeline script Data now flows correctly: 158 neighbourhoods × 12 years = 1,896 records Rent data available 2019-2025, crime data 2014-2024 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
384 lines
11 KiB
Python
384 lines
11 KiB
Python
"""Parser for CMHC rental data via Statistics Canada API.
|
|
|
|
Downloads rental market data (average rent, vacancy rates, universe)
|
|
from Statistics Canada's Web Data Service.
|
|
|
|
Data Sources:
|
|
- Table 34-10-0127: Vacancy rates
|
|
- Table 34-10-0129: Rental universe (total units)
|
|
- Table 34-10-0133: Average rent by bedroom type
|
|
"""
|
|
|
|
import contextlib
|
|
import io
|
|
import logging
|
|
import zipfile
|
|
from dataclasses import dataclass
|
|
from decimal import Decimal
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import httpx
|
|
import pandas as pd
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# StatCan Web Data Service endpoints
|
|
STATCAN_API_BASE = "https://www150.statcan.gc.ca/t1/wds/rest"
|
|
STATCAN_DOWNLOAD_BASE = "https://www150.statcan.gc.ca/n1/tbl/csv"
|
|
|
|
# CMHC table IDs
|
|
CMHC_TABLES = {
|
|
"vacancy": "34100127",
|
|
"universe": "34100129",
|
|
"rent": "34100133",
|
|
}
|
|
|
|
# Toronto CMA identifier in StatCan data
|
|
TORONTO_DGUID = "2011S0503535"
|
|
TORONTO_GEO_NAME = "Toronto, Ontario"
|
|
|
|
|
|
@dataclass
|
|
class CMHCRentalRecord:
|
|
"""Rental market record for database loading."""
|
|
|
|
year: int
|
|
month: int # CMHC surveys in October, so month=10
|
|
zone_name: str
|
|
bedroom_type: str
|
|
avg_rent: Decimal | None
|
|
vacancy_rate: Decimal | None
|
|
universe: int | None
|
|
|
|
|
|
class StatCanCMHCParser:
|
|
"""Parser for CMHC rental data from Statistics Canada.
|
|
|
|
Downloads and processes rental market survey data including:
|
|
- Average rents by bedroom type
|
|
- Vacancy rates
|
|
- Rental universe (total units)
|
|
|
|
Data is available from 1987 to present, updated annually in January.
|
|
"""
|
|
|
|
BEDROOM_TYPE_MAP = {
|
|
"Bachelor units": "bachelor",
|
|
"One bedroom units": "1bed",
|
|
"Two bedroom units": "2bed",
|
|
"Three bedroom units": "3bed",
|
|
"Total": "total",
|
|
}
|
|
|
|
STRUCTURE_FILTER = "Apartment structures of six units and over"
|
|
|
|
def __init__(
|
|
self,
|
|
cache_dir: Path | None = None,
|
|
timeout: float = 60.0,
|
|
) -> None:
|
|
"""Initialize parser.
|
|
|
|
Args:
|
|
cache_dir: Optional directory for caching downloaded files.
|
|
timeout: HTTP request timeout in seconds.
|
|
"""
|
|
self._cache_dir = cache_dir
|
|
self._timeout = timeout
|
|
self._client: httpx.Client | None = None
|
|
|
|
@property
|
|
def client(self) -> httpx.Client:
|
|
"""Lazy-initialize HTTP client."""
|
|
if self._client is None:
|
|
self._client = httpx.Client(
|
|
timeout=self._timeout,
|
|
follow_redirects=True,
|
|
)
|
|
return self._client
|
|
|
|
def close(self) -> None:
|
|
"""Close HTTP client."""
|
|
if self._client is not None:
|
|
self._client.close()
|
|
self._client = None
|
|
|
|
def __enter__(self) -> "StatCanCMHCParser":
|
|
return self
|
|
|
|
def __exit__(self, *args: Any) -> None:
|
|
self.close()
|
|
|
|
def _get_download_url(self, table_id: str) -> str:
|
|
"""Get CSV download URL for a StatCan table.
|
|
|
|
Args:
|
|
table_id: StatCan table ID (e.g., "34100133").
|
|
|
|
Returns:
|
|
Direct download URL for the CSV zip file.
|
|
"""
|
|
api_url = f"{STATCAN_API_BASE}/getFullTableDownloadCSV/{table_id}/en"
|
|
response = self.client.get(api_url)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
if data.get("status") != "SUCCESS":
|
|
raise ValueError(f"StatCan API error: {data}")
|
|
|
|
return str(data["object"])
|
|
|
|
def _download_table(self, table_id: str) -> pd.DataFrame:
|
|
"""Download and extract a StatCan table as DataFrame.
|
|
|
|
Args:
|
|
table_id: StatCan table ID.
|
|
|
|
Returns:
|
|
DataFrame with table data.
|
|
"""
|
|
# Check cache first
|
|
if self._cache_dir:
|
|
cache_file = self._cache_dir / f"{table_id}.csv"
|
|
if cache_file.exists():
|
|
logger.debug(f"Loading {table_id} from cache")
|
|
return pd.read_csv(cache_file)
|
|
|
|
# Get download URL and fetch
|
|
download_url = self._get_download_url(table_id)
|
|
logger.info(f"Downloading StatCan table {table_id}...")
|
|
|
|
response = self.client.get(download_url)
|
|
response.raise_for_status()
|
|
|
|
# Extract CSV from zip
|
|
with zipfile.ZipFile(io.BytesIO(response.content)) as zf:
|
|
csv_name = f"{table_id}.csv"
|
|
with zf.open(csv_name) as f:
|
|
df = pd.read_csv(f)
|
|
|
|
# Cache if directory specified
|
|
if self._cache_dir:
|
|
self._cache_dir.mkdir(parents=True, exist_ok=True)
|
|
df.to_csv(self._cache_dir / f"{table_id}.csv", index=False)
|
|
|
|
logger.info(f"Downloaded {len(df)} records from table {table_id}")
|
|
return df
|
|
|
|
def _filter_toronto(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
"""Filter DataFrame to Toronto CMA only.
|
|
|
|
Args:
|
|
df: Full StatCan DataFrame.
|
|
|
|
Returns:
|
|
DataFrame filtered to Toronto.
|
|
"""
|
|
# Try DGUID first, then GEO name
|
|
if "DGUID" in df.columns:
|
|
toronto_df = df[df["DGUID"] == TORONTO_DGUID]
|
|
if len(toronto_df) > 0:
|
|
return toronto_df
|
|
|
|
if "GEO" in df.columns:
|
|
return df[df["GEO"] == TORONTO_GEO_NAME]
|
|
|
|
raise ValueError("Could not identify Toronto data in DataFrame")
|
|
|
|
def get_vacancy_rates(
|
|
self,
|
|
years: list[int] | None = None,
|
|
) -> dict[int, Decimal]:
|
|
"""Fetch Toronto vacancy rates by year.
|
|
|
|
Args:
|
|
years: Optional list of years to filter.
|
|
|
|
Returns:
|
|
Dictionary mapping year to vacancy rate.
|
|
"""
|
|
df = self._download_table(CMHC_TABLES["vacancy"])
|
|
df = self._filter_toronto(df)
|
|
|
|
# Filter years if specified
|
|
if years:
|
|
df = df[df["REF_DATE"].isin(years)]
|
|
|
|
# Extract year -> rate mapping
|
|
rates = {}
|
|
for _, row in df.iterrows():
|
|
year = int(row["REF_DATE"])
|
|
value = row.get("VALUE")
|
|
if pd.notna(value):
|
|
rates[year] = Decimal(str(value))
|
|
|
|
logger.info(f"Fetched vacancy rates for {len(rates)} years")
|
|
return rates
|
|
|
|
def get_rental_universe(
|
|
self,
|
|
years: list[int] | None = None,
|
|
) -> dict[tuple[int, str], int]:
|
|
"""Fetch Toronto rental universe (total units) by year and bedroom type.
|
|
|
|
Args:
|
|
years: Optional list of years to filter.
|
|
|
|
Returns:
|
|
Dictionary mapping (year, bedroom_type) to unit count.
|
|
"""
|
|
df = self._download_table(CMHC_TABLES["universe"])
|
|
df = self._filter_toronto(df)
|
|
|
|
# Filter to standard apartment structures
|
|
if "Type of structure" in df.columns:
|
|
df = df[df["Type of structure"] == self.STRUCTURE_FILTER]
|
|
|
|
if years:
|
|
df = df[df["REF_DATE"].isin(years)]
|
|
|
|
universe = {}
|
|
for _, row in df.iterrows():
|
|
year = int(row["REF_DATE"])
|
|
bedroom_raw = row.get("Type of unit", "Total")
|
|
bedroom = self.BEDROOM_TYPE_MAP.get(bedroom_raw, "other")
|
|
value = row.get("VALUE")
|
|
|
|
if pd.notna(value) and value is not None:
|
|
universe[(year, bedroom)] = int(str(value))
|
|
|
|
logger.info(
|
|
f"Fetched rental universe for {len(universe)} year/bedroom combinations"
|
|
)
|
|
return universe
|
|
|
|
def get_average_rents(
|
|
self,
|
|
years: list[int] | None = None,
|
|
) -> dict[tuple[int, str], Decimal]:
|
|
"""Fetch Toronto average rents by year and bedroom type.
|
|
|
|
Args:
|
|
years: Optional list of years to filter.
|
|
|
|
Returns:
|
|
Dictionary mapping (year, bedroom_type) to average rent.
|
|
"""
|
|
df = self._download_table(CMHC_TABLES["rent"])
|
|
df = self._filter_toronto(df)
|
|
|
|
# Filter to standard apartment structures (most reliable data)
|
|
if "Type of structure" in df.columns:
|
|
df = df[df["Type of structure"] == self.STRUCTURE_FILTER]
|
|
|
|
if years:
|
|
df = df[df["REF_DATE"].isin(years)]
|
|
|
|
rents = {}
|
|
for _, row in df.iterrows():
|
|
year = int(row["REF_DATE"])
|
|
bedroom_raw = row.get("Type of unit", "Total")
|
|
bedroom = self.BEDROOM_TYPE_MAP.get(bedroom_raw, "other")
|
|
value = row.get("VALUE")
|
|
|
|
if pd.notna(value) and str(value) not in ("F", ".."):
|
|
with contextlib.suppress(Exception):
|
|
rents[(year, bedroom)] = Decimal(str(value))
|
|
|
|
logger.info(f"Fetched average rents for {len(rents)} year/bedroom combinations")
|
|
return rents
|
|
|
|
def get_all_rental_data(
|
|
self,
|
|
start_year: int = 2014,
|
|
end_year: int | None = None,
|
|
) -> list[CMHCRentalRecord]:
|
|
"""Fetch all Toronto rental data and combine into records.
|
|
|
|
Args:
|
|
start_year: First year to include.
|
|
end_year: Last year to include (defaults to current year + 1).
|
|
|
|
Returns:
|
|
List of CMHCRentalRecord objects ready for database loading.
|
|
"""
|
|
import datetime
|
|
|
|
if end_year is None:
|
|
end_year = datetime.date.today().year + 1
|
|
|
|
years = list(range(start_year, end_year + 1))
|
|
|
|
logger.info(
|
|
f"Fetching CMHC rental data for Toronto ({start_year}-{end_year})..."
|
|
)
|
|
|
|
# Fetch all data types
|
|
vacancy_rates = self.get_vacancy_rates(years)
|
|
rents = self.get_average_rents(years)
|
|
universe = self.get_rental_universe(years)
|
|
|
|
# Combine into records
|
|
records = []
|
|
bedroom_types = ["bachelor", "1bed", "2bed", "3bed"]
|
|
|
|
for year in years:
|
|
vacancy = vacancy_rates.get(year)
|
|
|
|
for bedroom in bedroom_types:
|
|
avg_rent = rents.get((year, bedroom))
|
|
units = universe.get((year, bedroom))
|
|
|
|
# Skip if no rent data for this year/bedroom
|
|
if avg_rent is None:
|
|
continue
|
|
|
|
records.append(
|
|
CMHCRentalRecord(
|
|
year=year,
|
|
month=10, # CMHC surveys in October
|
|
zone_name="Toronto CMA",
|
|
bedroom_type=bedroom,
|
|
avg_rent=avg_rent,
|
|
vacancy_rate=vacancy,
|
|
universe=units,
|
|
)
|
|
)
|
|
|
|
logger.info(f"Created {len(records)} CMHC rental records")
|
|
return records
|
|
|
|
|
|
def fetch_toronto_rental_data(
|
|
start_year: int = 2014,
|
|
end_year: int | None = None,
|
|
cache_dir: Path | None = None,
|
|
) -> list[CMHCRentalRecord]:
|
|
"""Convenience function to fetch Toronto rental data.
|
|
|
|
Args:
|
|
start_year: First year to include.
|
|
end_year: Last year to include.
|
|
cache_dir: Optional cache directory.
|
|
|
|
Returns:
|
|
List of CMHCRentalRecord objects.
|
|
"""
|
|
with StatCanCMHCParser(cache_dir=cache_dir) as parser:
|
|
return parser.get_all_rental_data(start_year, end_year)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Test the parser
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
records = fetch_toronto_rental_data(start_year=2020)
|
|
|
|
print(f"\nFetched {len(records)} records")
|
|
print("\nSample records:")
|
|
for r in records[:10]:
|
|
print(
|
|
f" {r.year} {r.bedroom_type}: ${r.avg_rent} rent, {r.vacancy_rate}% vacancy"
|
|
)
|