Some checks failed
CI / lint-and-test (push) Has been cancelled
Updates seed_amenity_data.py to also seed median_age values in fact_census where missing, ensuring demographics notebooks work. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
182 lines
4.7 KiB
Python
182 lines
4.7 KiB
Python
#!/usr/bin/env python3
|
|
"""Seed sample data for development/testing.
|
|
|
|
This script:
|
|
- Populates fact_amenities with sample data
|
|
- Updates dim_neighbourhood with population from fact_census
|
|
- Seeds median_age in fact_census where missing
|
|
- Runs dbt to rebuild the marts
|
|
|
|
Usage:
|
|
python scripts/data/seed_amenity_data.py
|
|
|
|
Run this after load_toronto_data.py to ensure notebooks have data.
|
|
"""
|
|
|
|
import os
|
|
import random
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
from dotenv import load_dotenv
|
|
from sqlalchemy import create_engine, text
|
|
|
|
PROJECT_ROOT = Path(__file__).parent.parent.parent
|
|
load_dotenv(PROJECT_ROOT / ".env")
|
|
|
|
DATABASE_URL = os.environ.get("DATABASE_URL")
|
|
if not DATABASE_URL:
|
|
print("ERROR: DATABASE_URL not set in .env")
|
|
sys.exit(1)
|
|
|
|
|
|
def seed_amenities() -> int:
|
|
"""Insert sample amenity data for all neighbourhoods."""
|
|
engine = create_engine(DATABASE_URL)
|
|
|
|
with engine.connect() as conn:
|
|
result = conn.execute(
|
|
text("SELECT neighbourhood_id FROM public.dim_neighbourhood")
|
|
)
|
|
neighbourhood_ids = [row[0] for row in result]
|
|
|
|
print(f"Found {len(neighbourhood_ids)} neighbourhoods")
|
|
|
|
amenity_types = [
|
|
"Parks",
|
|
"Schools",
|
|
"Transit Stops",
|
|
"Libraries",
|
|
"Community Centres",
|
|
"Recreation",
|
|
]
|
|
year = 2024
|
|
|
|
with engine.begin() as conn:
|
|
conn.execute(text("DELETE FROM public.fact_amenities"))
|
|
|
|
total = 0
|
|
for n_id in neighbourhood_ids:
|
|
for amenity_type in amenity_types:
|
|
count = random.randint(1, 50)
|
|
conn.execute(
|
|
text(
|
|
"""
|
|
INSERT INTO public.fact_amenities
|
|
(neighbourhood_id, amenity_type, count, year)
|
|
VALUES (:neighbourhood_id, :amenity_type, :count, :year)
|
|
"""
|
|
),
|
|
{
|
|
"neighbourhood_id": n_id,
|
|
"amenity_type": amenity_type,
|
|
"count": count,
|
|
"year": year,
|
|
},
|
|
)
|
|
total += 1
|
|
|
|
print(f"Inserted {total} amenity records")
|
|
return total
|
|
|
|
|
|
def update_population() -> int:
|
|
"""Update dim_neighbourhood with population from fact_census."""
|
|
engine = create_engine(DATABASE_URL)
|
|
|
|
with engine.begin() as conn:
|
|
result = conn.execute(
|
|
text(
|
|
"""
|
|
UPDATE public.dim_neighbourhood dn
|
|
SET population = fc.population
|
|
FROM public.fact_census fc
|
|
WHERE dn.neighbourhood_id = fc.neighbourhood_id
|
|
AND fc.census_year = 2021
|
|
"""
|
|
)
|
|
)
|
|
count = int(result.rowcount)
|
|
|
|
print(f"Updated {count} neighbourhoods with population")
|
|
return count
|
|
|
|
|
|
def seed_median_age() -> int:
|
|
"""Seed median_age in fact_census where missing."""
|
|
engine = create_engine(DATABASE_URL)
|
|
|
|
with engine.begin() as conn:
|
|
result = conn.execute(
|
|
text("SELECT id FROM public.fact_census WHERE median_age IS NULL")
|
|
)
|
|
null_ids = [row[0] for row in result]
|
|
|
|
if not null_ids:
|
|
print("No NULL median_age values found")
|
|
return 0
|
|
|
|
for census_id in null_ids:
|
|
age = random.randint(30, 50)
|
|
conn.execute(
|
|
text("UPDATE public.fact_census SET median_age = :age WHERE id = :id"),
|
|
{"age": age, "id": census_id},
|
|
)
|
|
|
|
print(f"Seeded median_age for {len(null_ids)} census records")
|
|
return len(null_ids)
|
|
|
|
|
|
def run_dbt() -> bool:
|
|
"""Run dbt to rebuild marts."""
|
|
dbt_dir = PROJECT_ROOT / "dbt"
|
|
venv_dbt = PROJECT_ROOT / ".venv" / "bin" / "dbt"
|
|
dbt_cmd = str(venv_dbt) if venv_dbt.exists() else "dbt"
|
|
|
|
print("Running dbt to rebuild marts...")
|
|
|
|
env = os.environ.copy()
|
|
|
|
result = subprocess.run(
|
|
[
|
|
dbt_cmd,
|
|
"run",
|
|
"--profiles-dir",
|
|
str(dbt_dir),
|
|
"--select",
|
|
"+mart_neighbourhood_amenities +mart_neighbourhood_demographics",
|
|
],
|
|
cwd=dbt_dir,
|
|
capture_output=True,
|
|
text=True,
|
|
env=env,
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
print(f"dbt failed:\n{result.stdout}\n{result.stderr}")
|
|
return False
|
|
|
|
print("dbt completed successfully")
|
|
return True
|
|
|
|
|
|
def main() -> int:
|
|
"""Main entry point."""
|
|
print("Seeding development data...")
|
|
|
|
seed_amenities()
|
|
update_population()
|
|
seed_median_age()
|
|
|
|
if not run_dbt():
|
|
return 1
|
|
|
|
print("\nDone! Development data is ready.")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
result = main()
|
|
sys.exit(result)
|