This comprehensive update transforms Job Forge from a generic MVP concept to a production-ready Python/FastAPI web application prototype with complete documentation, testing infrastructure, and deployment procedures. ## 🏗️ Architecture Changes - Updated all documentation to reflect Python/FastAPI + Dash + PostgreSQL stack - Transformed from MVP concept to deployable web application prototype - Added comprehensive multi-tenant architecture with Row Level Security (RLS) - Integrated Claude API and OpenAI API for AI-powered document generation ## 📚 Documentation Overhaul - **CLAUDE.md**: Complete rewrite as project orchestrator for 4 specialized agents - **README.md**: New centralized documentation hub with organized navigation - **API Specification**: Updated with comprehensive FastAPI endpoint documentation - **Database Design**: Enhanced schema with RLS policies and performance optimization - **Architecture Guide**: Transformed to web application focus with deployment strategy ## 🏗️ New Documentation Structure - **docs/development/**: Python/FastAPI coding standards and development guidelines - **docs/infrastructure/**: Docker setup and server deployment procedures - **docs/testing/**: Comprehensive QA procedures with pytest integration - **docs/ai/**: AI prompt templates and examples (preserved from original) ## 🎯 Team Structure Updates - **.claude/agents/**: 4 new Python/FastAPI specialized agents - simplified_technical_lead.md: Architecture and technical guidance - fullstack_developer.md: FastAPI backend + Dash frontend implementation - simplified_qa.md: pytest testing and quality assurance - simplified_devops.md: Docker deployment and server infrastructure ## 🧪 Testing Infrastructure - **pytest.ini**: Complete pytest configuration with coverage requirements - **tests/conftest.py**: Comprehensive test fixtures and database setup - **tests/unit/**: Example unit tests for auth and application services - **tests/integration/**: API integration test examples - Support for async testing, AI service mocking, and database testing ## 🧹 Cleanup - Removed 9 duplicate/outdated documentation files - Eliminated conflicting technology references (Node.js/TypeScript) - Consolidated overlapping content into comprehensive guides - Cleaned up project structure for professional development workflow ## 🚀 Production Ready Features - Docker containerization for development and production - Server deployment procedures for prototype hosting - Security best practices with JWT authentication and RLS - Performance optimization with database indexing and caching - Comprehensive testing strategy with quality gates This update establishes Job Forge as a professional Python/FastAPI web application prototype ready for development and deployment. 🤖 Generated with Claude Code (https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
469 lines
16 KiB
Python
469 lines
16 KiB
Python
# Integration tests for API authentication
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
from httpx import AsyncClient
|
|
import json
|
|
|
|
from app.main import app
|
|
from app.core.security import create_access_token
|
|
|
|
|
|
class TestAuthenticationEndpoints:
|
|
"""Test authentication API endpoints."""
|
|
|
|
def test_register_user_success(self, test_client):
|
|
"""Test successful user registration."""
|
|
|
|
user_data = {
|
|
"email": "newuser@test.com",
|
|
"password": "securepassword123",
|
|
"first_name": "New",
|
|
"last_name": "User"
|
|
}
|
|
|
|
response = test_client.post("/api/auth/register", json=user_data)
|
|
|
|
assert response.status_code == 201
|
|
data = response.json()
|
|
assert data["email"] == "newuser@test.com"
|
|
assert data["first_name"] == "New"
|
|
assert data["last_name"] == "User"
|
|
assert "password" not in data # Password should not be returned
|
|
assert "access_token" in data
|
|
assert "token_type" in data
|
|
|
|
def test_register_user_duplicate_email(self, test_client, test_user):
|
|
"""Test registration with duplicate email."""
|
|
|
|
user_data = {
|
|
"email": test_user.email, # Use existing user's email
|
|
"password": "differentpassword",
|
|
"first_name": "Duplicate",
|
|
"last_name": "User"
|
|
}
|
|
|
|
response = test_client.post("/api/auth/register", json=user_data)
|
|
|
|
assert response.status_code == 400
|
|
data = response.json()
|
|
assert "email already registered" in data["detail"].lower()
|
|
|
|
def test_register_user_invalid_email(self, test_client):
|
|
"""Test registration with invalid email format."""
|
|
|
|
user_data = {
|
|
"email": "invalid-email",
|
|
"password": "securepassword123",
|
|
"first_name": "Invalid",
|
|
"last_name": "Email"
|
|
}
|
|
|
|
response = test_client.post("/api/auth/register", json=user_data)
|
|
|
|
assert response.status_code == 422 # Validation error
|
|
|
|
def test_register_user_weak_password(self, test_client):
|
|
"""Test registration with weak password."""
|
|
|
|
user_data = {
|
|
"email": "weakpass@test.com",
|
|
"password": "123", # Too weak
|
|
"first_name": "Weak",
|
|
"last_name": "Password"
|
|
}
|
|
|
|
response = test_client.post("/api/auth/register", json=user_data)
|
|
|
|
assert response.status_code == 422 # Validation error
|
|
|
|
def test_login_success(self, test_client, test_user):
|
|
"""Test successful login."""
|
|
|
|
login_data = {
|
|
"username": test_user.email, # FastAPI OAuth2 uses 'username'
|
|
"password": "testpassword123" # From test_user fixture
|
|
}
|
|
|
|
response = test_client.post(
|
|
"/api/auth/login",
|
|
data=login_data, # OAuth2 expects form data
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "access_token" in data
|
|
assert "token_type" in data
|
|
assert data["token_type"] == "bearer"
|
|
|
|
def test_login_wrong_password(self, test_client, test_user):
|
|
"""Test login with wrong password."""
|
|
|
|
login_data = {
|
|
"username": test_user.email,
|
|
"password": "wrongpassword"
|
|
}
|
|
|
|
response = test_client.post(
|
|
"/api/auth/login",
|
|
data=login_data,
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"}
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert "incorrect" in data["detail"].lower()
|
|
|
|
def test_login_nonexistent_user(self, test_client):
|
|
"""Test login with non-existent user."""
|
|
|
|
login_data = {
|
|
"username": "nonexistent@test.com",
|
|
"password": "somepassword"
|
|
}
|
|
|
|
response = test_client.post(
|
|
"/api/auth/login",
|
|
data=login_data,
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"}
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
|
|
def test_get_current_user_success(self, test_client, test_user_token):
|
|
"""Test getting current user with valid token."""
|
|
|
|
headers = {"Authorization": f"Bearer {test_user_token}"}
|
|
response = test_client.get("/api/auth/me", headers=headers)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "email" in data
|
|
assert "id" in data
|
|
assert "first_name" in data
|
|
assert "last_name" in data
|
|
assert "password" not in data # Password should never be returned
|
|
|
|
def test_get_current_user_invalid_token(self, test_client):
|
|
"""Test getting current user with invalid token."""
|
|
|
|
headers = {"Authorization": "Bearer invalid.token.here"}
|
|
response = test_client.get("/api/auth/me", headers=headers)
|
|
|
|
assert response.status_code == 401
|
|
|
|
def test_get_current_user_no_token(self, test_client):
|
|
"""Test getting current user without token."""
|
|
|
|
response = test_client.get("/api/auth/me")
|
|
|
|
assert response.status_code == 401
|
|
|
|
def test_get_current_user_expired_token(self, test_client, test_user):
|
|
"""Test getting current user with expired token."""
|
|
|
|
from datetime import timedelta
|
|
|
|
# Create expired token
|
|
token_data = {"sub": str(test_user.id), "email": test_user.email}
|
|
expired_token = create_access_token(
|
|
data=token_data,
|
|
expires_delta=timedelta(seconds=-1) # Expired
|
|
)
|
|
|
|
headers = {"Authorization": f"Bearer {expired_token}"}
|
|
response = test_client.get("/api/auth/me", headers=headers)
|
|
|
|
assert response.status_code == 401
|
|
|
|
|
|
class TestProtectedEndpoints:
|
|
"""Test protected endpoints require authentication."""
|
|
|
|
def test_protected_endpoint_without_token(self, test_client):
|
|
"""Test accessing protected endpoint without token."""
|
|
|
|
response = test_client.get("/api/applications")
|
|
|
|
assert response.status_code == 401
|
|
|
|
def test_protected_endpoint_with_valid_token(self, test_client, test_user_token):
|
|
"""Test accessing protected endpoint with valid token."""
|
|
|
|
headers = {"Authorization": f"Bearer {test_user_token}"}
|
|
response = test_client.get("/api/applications", headers=headers)
|
|
|
|
assert response.status_code == 200
|
|
|
|
def test_protected_endpoint_with_invalid_token(self, test_client):
|
|
"""Test accessing protected endpoint with invalid token."""
|
|
|
|
headers = {"Authorization": "Bearer invalid.token"}
|
|
response = test_client.get("/api/applications", headers=headers)
|
|
|
|
assert response.status_code == 401
|
|
|
|
def test_create_application_requires_auth(self, test_client):
|
|
"""Test creating application requires authentication."""
|
|
|
|
app_data = {
|
|
"company_name": "Test Corp",
|
|
"role_title": "Developer",
|
|
"job_description": "Test role",
|
|
"status": "draft"
|
|
}
|
|
|
|
response = test_client.post("/api/applications", json=app_data)
|
|
|
|
assert response.status_code == 401
|
|
|
|
def test_create_application_with_auth(self, test_client, test_user_token):
|
|
"""Test creating application with authentication."""
|
|
|
|
app_data = {
|
|
"company_name": "Auth Test Corp",
|
|
"role_title": "Developer",
|
|
"job_description": "Test role with auth",
|
|
"status": "draft"
|
|
}
|
|
|
|
headers = {"Authorization": f"Bearer {test_user_token}"}
|
|
response = test_client.post("/api/applications", json=app_data, headers=headers)
|
|
|
|
assert response.status_code == 201
|
|
data = response.json()
|
|
assert data["company_name"] == "Auth Test Corp"
|
|
|
|
|
|
class TestTokenValidation:
|
|
"""Test token validation scenarios."""
|
|
|
|
def test_malformed_token(self, test_client):
|
|
"""Test malformed token handling."""
|
|
|
|
malformed_tokens = [
|
|
"Bearer",
|
|
"Bearer ",
|
|
"not-a-token",
|
|
"Bearer not.a.jwt",
|
|
"NotBearer validtoken"
|
|
]
|
|
|
|
for token in malformed_tokens:
|
|
headers = {"Authorization": token}
|
|
response = test_client.get("/api/auth/me", headers=headers)
|
|
assert response.status_code == 401
|
|
|
|
def test_token_with_invalid_signature(self, test_client):
|
|
"""Test token with invalid signature."""
|
|
|
|
# Create a token with wrong signature
|
|
invalid_token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ1c2VyMTIzIiwiZXhwIjoxNjk5OTk5OTk5fQ.invalid_signature"
|
|
|
|
headers = {"Authorization": f"Bearer {invalid_token}"}
|
|
response = test_client.get("/api/auth/me", headers=headers)
|
|
|
|
assert response.status_code == 401
|
|
|
|
def test_token_missing_required_claims(self, test_client):
|
|
"""Test token missing required claims."""
|
|
|
|
# Create token without required 'sub' claim
|
|
token_data = {"email": "test@example.com"} # Missing 'sub'
|
|
token = create_access_token(data=token_data)
|
|
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
response = test_client.get("/api/auth/me", headers=headers)
|
|
|
|
assert response.status_code == 401
|
|
|
|
|
|
class TestAuthenticationFlow:
|
|
"""Test complete authentication flows."""
|
|
|
|
def test_complete_registration_and_login_flow(self, test_client):
|
|
"""Test complete flow from registration to authenticated request."""
|
|
|
|
# 1. Register new user
|
|
user_data = {
|
|
"email": "flowtest@test.com",
|
|
"password": "securepassword123",
|
|
"first_name": "Flow",
|
|
"last_name": "Test"
|
|
}
|
|
|
|
register_response = test_client.post("/api/auth/register", json=user_data)
|
|
assert register_response.status_code == 201
|
|
|
|
register_data = register_response.json()
|
|
registration_token = register_data["access_token"]
|
|
|
|
# 2. Use registration token to access protected endpoint
|
|
headers = {"Authorization": f"Bearer {registration_token}"}
|
|
protected_response = test_client.get("/api/auth/me", headers=headers)
|
|
assert protected_response.status_code == 200
|
|
|
|
# 3. Login with same credentials
|
|
login_data = {
|
|
"username": "flowtest@test.com",
|
|
"password": "securepassword123"
|
|
}
|
|
|
|
login_response = test_client.post(
|
|
"/api/auth/login",
|
|
data=login_data,
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"}
|
|
)
|
|
assert login_response.status_code == 200
|
|
|
|
login_token = login_response.json()["access_token"]
|
|
|
|
# 4. Use login token to access protected endpoint
|
|
headers = {"Authorization": f"Bearer {login_token}"}
|
|
me_response = test_client.get("/api/auth/me", headers=headers)
|
|
assert me_response.status_code == 200
|
|
|
|
me_data = me_response.json()
|
|
assert me_data["email"] == "flowtest@test.com"
|
|
|
|
def test_user_isolation_between_tokens(self, test_client):
|
|
"""Test that different user tokens access different data."""
|
|
|
|
# Create two users
|
|
user1_data = {
|
|
"email": "user1@isolation.test",
|
|
"password": "password123",
|
|
"first_name": "User",
|
|
"last_name": "One"
|
|
}
|
|
|
|
user2_data = {
|
|
"email": "user2@isolation.test",
|
|
"password": "password123",
|
|
"first_name": "User",
|
|
"last_name": "Two"
|
|
}
|
|
|
|
# Register both users
|
|
user1_response = test_client.post("/api/auth/register", json=user1_data)
|
|
user2_response = test_client.post("/api/auth/register", json=user2_data)
|
|
|
|
assert user1_response.status_code == 201
|
|
assert user2_response.status_code == 201
|
|
|
|
user1_token = user1_response.json()["access_token"]
|
|
user2_token = user2_response.json()["access_token"]
|
|
|
|
# Get user info with each token
|
|
user1_headers = {"Authorization": f"Bearer {user1_token}"}
|
|
user2_headers = {"Authorization": f"Bearer {user2_token}"}
|
|
|
|
user1_me = test_client.get("/api/auth/me", headers=user1_headers)
|
|
user2_me = test_client.get("/api/auth/me", headers=user2_headers)
|
|
|
|
assert user1_me.status_code == 200
|
|
assert user2_me.status_code == 200
|
|
|
|
user1_data = user1_me.json()
|
|
user2_data = user2_me.json()
|
|
|
|
# Verify users are different
|
|
assert user1_data["email"] != user2_data["email"]
|
|
assert user1_data["id"] != user2_data["id"]
|
|
|
|
|
|
class TestRateLimiting:
|
|
"""Test rate limiting on authentication endpoints."""
|
|
|
|
def test_login_rate_limiting(self, test_client, test_user):
|
|
"""Test rate limiting on login attempts."""
|
|
|
|
login_data = {
|
|
"username": test_user.email,
|
|
"password": "wrongpassword"
|
|
}
|
|
|
|
# Make multiple failed login attempts
|
|
responses = []
|
|
for _ in range(10): # Assuming rate limit is higher than 10
|
|
response = test_client.post(
|
|
"/api/auth/login",
|
|
data=login_data,
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"}
|
|
)
|
|
responses.append(response)
|
|
|
|
# All should return 401 (wrong password) but not rate limited
|
|
for response in responses:
|
|
assert response.status_code == 401
|
|
# If rate limiting is implemented, some responses might be 429
|
|
|
|
def test_registration_rate_limiting(self, test_client):
|
|
"""Test rate limiting on registration attempts."""
|
|
|
|
# Make multiple registration attempts
|
|
responses = []
|
|
for i in range(5):
|
|
user_data = {
|
|
"email": f"ratelimit{i}@test.com",
|
|
"password": "password123",
|
|
"first_name": "Rate",
|
|
"last_name": f"Limit{i}"
|
|
}
|
|
|
|
response = test_client.post("/api/auth/register", json=user_data)
|
|
responses.append(response)
|
|
|
|
# Most should succeed (assuming reasonable rate limits)
|
|
successful_responses = [r for r in responses if r.status_code == 201]
|
|
assert len(successful_responses) >= 3 # At least some should succeed
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestAsyncAuthEndpoints:
|
|
"""Test authentication endpoints with async client."""
|
|
|
|
async def test_async_register_user(self, async_client):
|
|
"""Test user registration with async client."""
|
|
|
|
user_data = {
|
|
"email": "async@test.com",
|
|
"password": "asyncpassword123",
|
|
"first_name": "Async",
|
|
"last_name": "User"
|
|
}
|
|
|
|
response = await async_client.post("/api/auth/register", json=user_data)
|
|
|
|
assert response.status_code == 201
|
|
data = response.json()
|
|
assert data["email"] == "async@test.com"
|
|
assert "access_token" in data
|
|
|
|
async def test_async_login_user(self, async_client, test_user):
|
|
"""Test user login with async client."""
|
|
|
|
login_data = {
|
|
"username": test_user.email,
|
|
"password": "testpassword123"
|
|
}
|
|
|
|
response = await async_client.post(
|
|
"/api/auth/login",
|
|
data=login_data,
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"}
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "access_token" in data
|
|
|
|
async def test_async_protected_endpoint(self, async_client, test_user_token):
|
|
"""Test protected endpoint with async client."""
|
|
|
|
headers = {"Authorization": f"Bearer {test_user_token}"}
|
|
response = await async_client.get("/api/auth/me", headers=headers)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "email" in data |