# Integration tests for API authentication import pytest from fastapi.testclient import TestClient from httpx import AsyncClient import json from app.main import app from app.core.security import create_access_token class TestAuthenticationEndpoints: """Test authentication API endpoints.""" def test_register_user_success(self, test_client): """Test successful user registration.""" user_data = { "email": "newuser@test.com", "password": "securepassword123", "first_name": "New", "last_name": "User" } response = test_client.post("/api/auth/register", json=user_data) assert response.status_code == 201 data = response.json() assert data["email"] == "newuser@test.com" assert data["first_name"] == "New" assert data["last_name"] == "User" assert "password" not in data # Password should not be returned assert "access_token" in data assert "token_type" in data def test_register_user_duplicate_email(self, test_client, test_user): """Test registration with duplicate email.""" user_data = { "email": test_user.email, # Use existing user's email "password": "differentpassword", "first_name": "Duplicate", "last_name": "User" } response = test_client.post("/api/auth/register", json=user_data) assert response.status_code == 400 data = response.json() assert "email already registered" in data["detail"].lower() def test_register_user_invalid_email(self, test_client): """Test registration with invalid email format.""" user_data = { "email": "invalid-email", "password": "securepassword123", "first_name": "Invalid", "last_name": "Email" } response = test_client.post("/api/auth/register", json=user_data) assert response.status_code == 422 # Validation error def test_register_user_weak_password(self, test_client): """Test registration with weak password.""" user_data = { "email": "weakpass@test.com", "password": "123", # Too weak "first_name": "Weak", "last_name": "Password" } response = test_client.post("/api/auth/register", json=user_data) assert response.status_code == 422 # Validation error def test_login_success(self, test_client, test_user): """Test successful login.""" login_data = { "username": test_user.email, # FastAPI OAuth2 uses 'username' "password": "testpassword123" # From test_user fixture } response = test_client.post( "/api/auth/login", data=login_data, # OAuth2 expects form data headers={"Content-Type": "application/x-www-form-urlencoded"} ) assert response.status_code == 200 data = response.json() assert "access_token" in data assert "token_type" in data assert data["token_type"] == "bearer" def test_login_wrong_password(self, test_client, test_user): """Test login with wrong password.""" login_data = { "username": test_user.email, "password": "wrongpassword" } response = test_client.post( "/api/auth/login", data=login_data, headers={"Content-Type": "application/x-www-form-urlencoded"} ) assert response.status_code == 401 data = response.json() assert "incorrect" in data["detail"].lower() def test_login_nonexistent_user(self, test_client): """Test login with non-existent user.""" login_data = { "username": "nonexistent@test.com", "password": "somepassword" } response = test_client.post( "/api/auth/login", data=login_data, headers={"Content-Type": "application/x-www-form-urlencoded"} ) assert response.status_code == 401 def test_get_current_user_success(self, test_client, test_user_token): """Test getting current user with valid token.""" headers = {"Authorization": f"Bearer {test_user_token}"} response = test_client.get("/api/auth/me", headers=headers) assert response.status_code == 200 data = response.json() assert "email" in data assert "id" in data assert "first_name" in data assert "last_name" in data assert "password" not in data # Password should never be returned def test_get_current_user_invalid_token(self, test_client): """Test getting current user with invalid token.""" headers = {"Authorization": "Bearer invalid.token.here"} response = test_client.get("/api/auth/me", headers=headers) assert response.status_code == 401 def test_get_current_user_no_token(self, test_client): """Test getting current user without token.""" response = test_client.get("/api/auth/me") assert response.status_code == 401 def test_get_current_user_expired_token(self, test_client, test_user): """Test getting current user with expired token.""" from datetime import timedelta # Create expired token token_data = {"sub": str(test_user.id), "email": test_user.email} expired_token = create_access_token( data=token_data, expires_delta=timedelta(seconds=-1) # Expired ) headers = {"Authorization": f"Bearer {expired_token}"} response = test_client.get("/api/auth/me", headers=headers) assert response.status_code == 401 class TestProtectedEndpoints: """Test protected endpoints require authentication.""" def test_protected_endpoint_without_token(self, test_client): """Test accessing protected endpoint without token.""" response = test_client.get("/api/applications") assert response.status_code == 401 def test_protected_endpoint_with_valid_token(self, test_client, test_user_token): """Test accessing protected endpoint with valid token.""" headers = {"Authorization": f"Bearer {test_user_token}"} response = test_client.get("/api/applications", headers=headers) assert response.status_code == 200 def test_protected_endpoint_with_invalid_token(self, test_client): """Test accessing protected endpoint with invalid token.""" headers = {"Authorization": "Bearer invalid.token"} response = test_client.get("/api/applications", headers=headers) assert response.status_code == 401 def test_create_application_requires_auth(self, test_client): """Test creating application requires authentication.""" app_data = { "company_name": "Test Corp", "role_title": "Developer", "job_description": "Test role", "status": "draft" } response = test_client.post("/api/applications", json=app_data) assert response.status_code == 401 def test_create_application_with_auth(self, test_client, test_user_token): """Test creating application with authentication.""" app_data = { "company_name": "Auth Test Corp", "role_title": "Developer", "job_description": "Test role with auth", "status": "draft" } headers = {"Authorization": f"Bearer {test_user_token}"} response = test_client.post("/api/applications", json=app_data, headers=headers) assert response.status_code == 201 data = response.json() assert data["company_name"] == "Auth Test Corp" class TestTokenValidation: """Test token validation scenarios.""" def test_malformed_token(self, test_client): """Test malformed token handling.""" malformed_tokens = [ "Bearer", "Bearer ", "not-a-token", "Bearer not.a.jwt", "NotBearer validtoken" ] for token in malformed_tokens: headers = {"Authorization": token} response = test_client.get("/api/auth/me", headers=headers) assert response.status_code == 401 def test_token_with_invalid_signature(self, test_client): """Test token with invalid signature.""" # Create a token with wrong signature invalid_token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ1c2VyMTIzIiwiZXhwIjoxNjk5OTk5OTk5fQ.invalid_signature" headers = {"Authorization": f"Bearer {invalid_token}"} response = test_client.get("/api/auth/me", headers=headers) assert response.status_code == 401 def test_token_missing_required_claims(self, test_client): """Test token missing required claims.""" # Create token without required 'sub' claim token_data = {"email": "test@example.com"} # Missing 'sub' token = create_access_token(data=token_data) headers = {"Authorization": f"Bearer {token}"} response = test_client.get("/api/auth/me", headers=headers) assert response.status_code == 401 class TestAuthenticationFlow: """Test complete authentication flows.""" def test_complete_registration_and_login_flow(self, test_client): """Test complete flow from registration to authenticated request.""" # 1. Register new user user_data = { "email": "flowtest@test.com", "password": "securepassword123", "first_name": "Flow", "last_name": "Test" } register_response = test_client.post("/api/auth/register", json=user_data) assert register_response.status_code == 201 register_data = register_response.json() registration_token = register_data["access_token"] # 2. Use registration token to access protected endpoint headers = {"Authorization": f"Bearer {registration_token}"} protected_response = test_client.get("/api/auth/me", headers=headers) assert protected_response.status_code == 200 # 3. Login with same credentials login_data = { "username": "flowtest@test.com", "password": "securepassword123" } login_response = test_client.post( "/api/auth/login", data=login_data, headers={"Content-Type": "application/x-www-form-urlencoded"} ) assert login_response.status_code == 200 login_token = login_response.json()["access_token"] # 4. Use login token to access protected endpoint headers = {"Authorization": f"Bearer {login_token}"} me_response = test_client.get("/api/auth/me", headers=headers) assert me_response.status_code == 200 me_data = me_response.json() assert me_data["email"] == "flowtest@test.com" def test_user_isolation_between_tokens(self, test_client): """Test that different user tokens access different data.""" # Create two users user1_data = { "email": "user1@isolation.test", "password": "password123", "first_name": "User", "last_name": "One" } user2_data = { "email": "user2@isolation.test", "password": "password123", "first_name": "User", "last_name": "Two" } # Register both users user1_response = test_client.post("/api/auth/register", json=user1_data) user2_response = test_client.post("/api/auth/register", json=user2_data) assert user1_response.status_code == 201 assert user2_response.status_code == 201 user1_token = user1_response.json()["access_token"] user2_token = user2_response.json()["access_token"] # Get user info with each token user1_headers = {"Authorization": f"Bearer {user1_token}"} user2_headers = {"Authorization": f"Bearer {user2_token}"} user1_me = test_client.get("/api/auth/me", headers=user1_headers) user2_me = test_client.get("/api/auth/me", headers=user2_headers) assert user1_me.status_code == 200 assert user2_me.status_code == 200 user1_data = user1_me.json() user2_data = user2_me.json() # Verify users are different assert user1_data["email"] != user2_data["email"] assert user1_data["id"] != user2_data["id"] class TestRateLimiting: """Test rate limiting on authentication endpoints.""" def test_login_rate_limiting(self, test_client, test_user): """Test rate limiting on login attempts.""" login_data = { "username": test_user.email, "password": "wrongpassword" } # Make multiple failed login attempts responses = [] for _ in range(10): # Assuming rate limit is higher than 10 response = test_client.post( "/api/auth/login", data=login_data, headers={"Content-Type": "application/x-www-form-urlencoded"} ) responses.append(response) # All should return 401 (wrong password) but not rate limited for response in responses: assert response.status_code == 401 # If rate limiting is implemented, some responses might be 429 def test_registration_rate_limiting(self, test_client): """Test rate limiting on registration attempts.""" # Make multiple registration attempts responses = [] for i in range(5): user_data = { "email": f"ratelimit{i}@test.com", "password": "password123", "first_name": "Rate", "last_name": f"Limit{i}" } response = test_client.post("/api/auth/register", json=user_data) responses.append(response) # Most should succeed (assuming reasonable rate limits) successful_responses = [r for r in responses if r.status_code == 201] assert len(successful_responses) >= 3 # At least some should succeed @pytest.mark.asyncio class TestAsyncAuthEndpoints: """Test authentication endpoints with async client.""" async def test_async_register_user(self, async_client): """Test user registration with async client.""" user_data = { "email": "async@test.com", "password": "asyncpassword123", "first_name": "Async", "last_name": "User" } response = await async_client.post("/api/auth/register", json=user_data) assert response.status_code == 201 data = response.json() assert data["email"] == "async@test.com" assert "access_token" in data async def test_async_login_user(self, async_client, test_user): """Test user login with async client.""" login_data = { "username": test_user.email, "password": "testpassword123" } response = await async_client.post( "/api/auth/login", data=login_data, headers={"Content-Type": "application/x-www-form-urlencoded"} ) assert response.status_code == 200 data = response.json() assert "access_token" in data async def test_async_protected_endpoint(self, async_client, test_user_token): """Test protected endpoint with async client.""" headers = {"Authorization": f"Bearer {test_user_token}"} response = await async_client.get("/api/auth/me", headers=headers) assert response.status_code == 200 data = response.json() assert "email" in data