feat: Complete Phase 5 dashboard implementation
Implement full 5-tab Toronto Neighbourhood Dashboard with real data connectivity: Dashboard Structure: - Overview tab with livability scores and rankings - Housing tab with affordability metrics - Safety tab with crime statistics - Demographics tab with population/income data - Amenities tab with parks, schools, transit Figure Factories (portfolio_app/figures/): - bar_charts.py: ranking, stacked, horizontal bars - scatter.py: scatter plots, bubble charts - radar.py: spider/radar charts - demographics.py: donut, age pyramid, income distribution Service Layer (portfolio_app/toronto/services/): - neighbourhood_service.py: queries dbt marts for all tab data - geometry_service.py: generates GeoJSON from PostGIS - Graceful error handling when database unavailable Callbacks (portfolio_app/pages/toronto/callbacks/): - map_callbacks.py: choropleth updates, map click handling - chart_callbacks.py: supporting chart updates - selection_callbacks.py: dropdown handlers, KPI updates Data Pipeline (scripts/data/): - load_toronto_data.py: orchestration script with CLI flags Lessons Learned: - Graceful error handling in service layers - Modular callback structure for multi-tab dashboards - Figure factory pattern for reusable charts Closes: #64, #65, #66, #67, #68, #69, #70 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
367
scripts/data/load_toronto_data.py
Normal file
367
scripts/data/load_toronto_data.py
Normal file
@@ -0,0 +1,367 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Load Toronto neighbourhood data into the database.
|
||||
|
||||
Usage:
|
||||
python scripts/data/load_toronto_data.py [OPTIONS]
|
||||
|
||||
Options:
|
||||
--skip-fetch Skip API fetching, only run dbt
|
||||
--skip-dbt Skip dbt run, only load data
|
||||
--dry-run Show what would be done without executing
|
||||
-v, --verbose Enable verbose logging
|
||||
|
||||
This script orchestrates:
|
||||
1. Fetching data from Toronto Open Data and CMHC APIs
|
||||
2. Loading data into PostgreSQL fact tables
|
||||
3. Running dbt to transform staging -> intermediate -> marts
|
||||
|
||||
Exit codes:
|
||||
0 = Success
|
||||
1 = Error
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import subprocess
|
||||
import sys
|
||||
from datetime import date
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
# Add project root to path
|
||||
PROJECT_ROOT = Path(__file__).parent.parent.parent
|
||||
sys.path.insert(0, str(PROJECT_ROOT))
|
||||
|
||||
from portfolio_app.toronto.loaders import ( # noqa: E402
|
||||
get_session,
|
||||
load_amenities,
|
||||
load_census_data,
|
||||
load_crime_data,
|
||||
load_neighbourhoods,
|
||||
load_time_dimension,
|
||||
)
|
||||
from portfolio_app.toronto.parsers import ( # noqa: E402
|
||||
TorontoOpenDataParser,
|
||||
TorontoPoliceParser,
|
||||
)
|
||||
from portfolio_app.toronto.schemas import Neighbourhood # noqa: E402
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s - %(levelname)s - %(message)s",
|
||||
datefmt="%H:%M:%S",
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DataPipeline:
|
||||
"""Orchestrates data loading from APIs to database to dbt."""
|
||||
|
||||
def __init__(self, dry_run: bool = False, verbose: bool = False):
|
||||
self.dry_run = dry_run
|
||||
self.verbose = verbose
|
||||
self.stats: dict[str, int] = {}
|
||||
|
||||
if verbose:
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
|
||||
def fetch_and_load(self) -> bool:
|
||||
"""Fetch data from APIs and load into database.
|
||||
|
||||
Returns:
|
||||
True if successful, False otherwise.
|
||||
"""
|
||||
logger.info("Starting data fetch and load pipeline...")
|
||||
|
||||
try:
|
||||
with get_session() as session:
|
||||
# 1. Load time dimension first (for date keys)
|
||||
self._load_time_dimension(session)
|
||||
|
||||
# 2. Load neighbourhoods (required for foreign keys)
|
||||
self._load_neighbourhoods(session)
|
||||
|
||||
# 3. Load census data
|
||||
self._load_census(session)
|
||||
|
||||
# 4. Load crime data
|
||||
self._load_crime(session)
|
||||
|
||||
# 5. Load amenities
|
||||
self._load_amenities(session)
|
||||
|
||||
session.commit()
|
||||
logger.info("All data committed to database")
|
||||
|
||||
self._print_stats()
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Pipeline failed: {e}")
|
||||
if self.verbose:
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
return False
|
||||
|
||||
def _load_time_dimension(self, session: Any) -> None:
|
||||
"""Load time dimension with date range for dashboard."""
|
||||
logger.info("Loading time dimension...")
|
||||
|
||||
if self.dry_run:
|
||||
logger.info(
|
||||
" [DRY RUN] Would load time dimension 2019-01-01 to 2025-12-01"
|
||||
)
|
||||
return
|
||||
|
||||
count = load_time_dimension(
|
||||
start_date=date(2019, 1, 1),
|
||||
end_date=date(2025, 12, 1),
|
||||
session=session,
|
||||
)
|
||||
self.stats["time_dimension"] = count
|
||||
logger.info(f" Loaded {count} time dimension records")
|
||||
|
||||
def _load_neighbourhoods(self, session: Any) -> None:
|
||||
"""Fetch and load neighbourhood boundaries."""
|
||||
logger.info("Fetching neighbourhoods from Toronto Open Data...")
|
||||
|
||||
if self.dry_run:
|
||||
logger.info(" [DRY RUN] Would fetch and load neighbourhoods")
|
||||
return
|
||||
|
||||
import json
|
||||
|
||||
parser = TorontoOpenDataParser()
|
||||
raw_neighbourhoods = parser.get_neighbourhoods()
|
||||
|
||||
# Convert NeighbourhoodRecord to Neighbourhood schema
|
||||
neighbourhoods = []
|
||||
for n in raw_neighbourhoods:
|
||||
# Convert GeoJSON geometry dict to WKT if present
|
||||
geometry_wkt = None
|
||||
if n.geometry:
|
||||
# Store as GeoJSON string for PostGIS ST_GeomFromGeoJSON
|
||||
geometry_wkt = json.dumps(n.geometry)
|
||||
|
||||
neighbourhood = Neighbourhood(
|
||||
neighbourhood_id=n.area_id,
|
||||
name=n.area_name,
|
||||
geometry_wkt=geometry_wkt,
|
||||
population=None, # Will be filled from census data
|
||||
land_area_sqkm=None,
|
||||
pop_density_per_sqkm=None,
|
||||
census_year=2021,
|
||||
)
|
||||
neighbourhoods.append(neighbourhood)
|
||||
|
||||
count = load_neighbourhoods(neighbourhoods, session)
|
||||
self.stats["neighbourhoods"] = count
|
||||
logger.info(f" Loaded {count} neighbourhoods")
|
||||
|
||||
def _load_census(self, session: Any) -> None:
|
||||
"""Fetch and load census profile data."""
|
||||
logger.info("Fetching census profiles from Toronto Open Data...")
|
||||
|
||||
if self.dry_run:
|
||||
logger.info(" [DRY RUN] Would fetch and load census data")
|
||||
return
|
||||
|
||||
parser = TorontoOpenDataParser()
|
||||
census_records = parser.get_census_profiles(year=2021)
|
||||
|
||||
if not census_records:
|
||||
logger.warning(" No census records fetched")
|
||||
return
|
||||
|
||||
count = load_census_data(census_records, session)
|
||||
self.stats["census"] = count
|
||||
logger.info(f" Loaded {count} census records")
|
||||
|
||||
def _load_crime(self, session: Any) -> None:
|
||||
"""Fetch and load crime statistics."""
|
||||
logger.info("Fetching crime data from Toronto Police Service...")
|
||||
|
||||
if self.dry_run:
|
||||
logger.info(" [DRY RUN] Would fetch and load crime data")
|
||||
return
|
||||
|
||||
parser = TorontoPoliceParser()
|
||||
crime_records = parser.get_crime_rates()
|
||||
|
||||
if not crime_records:
|
||||
logger.warning(" No crime records fetched")
|
||||
return
|
||||
|
||||
count = load_crime_data(crime_records, session)
|
||||
self.stats["crime"] = count
|
||||
logger.info(f" Loaded {count} crime records")
|
||||
|
||||
def _load_amenities(self, session: Any) -> None:
|
||||
"""Fetch and load amenity data (parks, schools, childcare)."""
|
||||
logger.info("Fetching amenities from Toronto Open Data...")
|
||||
|
||||
if self.dry_run:
|
||||
logger.info(" [DRY RUN] Would fetch and load amenity data")
|
||||
return
|
||||
|
||||
parser = TorontoOpenDataParser()
|
||||
total_count = 0
|
||||
|
||||
# Fetch parks
|
||||
try:
|
||||
parks = parser.get_parks()
|
||||
if parks:
|
||||
count = load_amenities(parks, year=2024, session=session)
|
||||
total_count += count
|
||||
logger.info(f" Loaded {count} park amenities")
|
||||
except Exception as e:
|
||||
logger.warning(f" Failed to load parks: {e}")
|
||||
|
||||
# Fetch schools
|
||||
try:
|
||||
schools = parser.get_schools()
|
||||
if schools:
|
||||
count = load_amenities(schools, year=2024, session=session)
|
||||
total_count += count
|
||||
logger.info(f" Loaded {count} school amenities")
|
||||
except Exception as e:
|
||||
logger.warning(f" Failed to load schools: {e}")
|
||||
|
||||
# Fetch childcare centres
|
||||
try:
|
||||
childcare = parser.get_childcare_centres()
|
||||
if childcare:
|
||||
count = load_amenities(childcare, year=2024, session=session)
|
||||
total_count += count
|
||||
logger.info(f" Loaded {count} childcare amenities")
|
||||
except Exception as e:
|
||||
logger.warning(f" Failed to load childcare: {e}")
|
||||
|
||||
self.stats["amenities"] = total_count
|
||||
|
||||
def run_dbt(self) -> bool:
|
||||
"""Run dbt to transform data.
|
||||
|
||||
Returns:
|
||||
True if successful, False otherwise.
|
||||
"""
|
||||
logger.info("Running dbt transformations...")
|
||||
|
||||
dbt_project_dir = PROJECT_ROOT / "dbt"
|
||||
|
||||
if not dbt_project_dir.exists():
|
||||
logger.error(f"dbt project directory not found: {dbt_project_dir}")
|
||||
return False
|
||||
|
||||
if self.dry_run:
|
||||
logger.info(" [DRY RUN] Would run: dbt run")
|
||||
logger.info(" [DRY RUN] Would run: dbt test")
|
||||
return True
|
||||
|
||||
try:
|
||||
# Run dbt models
|
||||
logger.info(" Running dbt run...")
|
||||
result = subprocess.run(
|
||||
["dbt", "run"],
|
||||
cwd=dbt_project_dir,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
logger.error(f"dbt run failed:\n{result.stderr}")
|
||||
if self.verbose:
|
||||
logger.debug(f"dbt output:\n{result.stdout}")
|
||||
return False
|
||||
|
||||
logger.info(" dbt run completed successfully")
|
||||
|
||||
# Run dbt tests
|
||||
logger.info(" Running dbt test...")
|
||||
result = subprocess.run(
|
||||
["dbt", "test"],
|
||||
cwd=dbt_project_dir,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
logger.warning(f"dbt test had failures:\n{result.stderr}")
|
||||
# Don't fail on test failures, just warn
|
||||
else:
|
||||
logger.info(" dbt test completed successfully")
|
||||
|
||||
return True
|
||||
|
||||
except FileNotFoundError:
|
||||
logger.error(
|
||||
"dbt not found in PATH. Install with: pip install dbt-postgres"
|
||||
)
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"dbt execution failed: {e}")
|
||||
return False
|
||||
|
||||
def _print_stats(self) -> None:
|
||||
"""Print loading statistics."""
|
||||
if not self.stats:
|
||||
return
|
||||
|
||||
logger.info("Loading statistics:")
|
||||
for key, count in self.stats.items():
|
||||
logger.info(f" {key}: {count} records")
|
||||
|
||||
|
||||
def main() -> int:
|
||||
"""Main entry point for the data loading script."""
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Load Toronto neighbourhood data into the database",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog=__doc__,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--skip-fetch",
|
||||
action="store_true",
|
||||
help="Skip API fetching, only run dbt",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--skip-dbt",
|
||||
action="store_true",
|
||||
help="Skip dbt run, only load data",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help="Show what would be done without executing",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-v",
|
||||
"--verbose",
|
||||
action="store_true",
|
||||
help="Enable verbose logging",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.skip_fetch and args.skip_dbt:
|
||||
logger.error("Cannot skip both fetch and dbt - nothing to do")
|
||||
return 1
|
||||
|
||||
pipeline = DataPipeline(dry_run=args.dry_run, verbose=args.verbose)
|
||||
|
||||
# Execute pipeline stages
|
||||
if not args.skip_fetch and not pipeline.fetch_and_load():
|
||||
return 1
|
||||
|
||||
if not args.skip_dbt and not pipeline.run_dbt():
|
||||
return 1
|
||||
|
||||
logger.info("Pipeline completed successfully!")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user