diff --git a/tests/auth/test_jwt.py b/tests/auth/test_jwt.py index 9cb2fcf..5eb81ec 100644 --- a/tests/auth/test_jwt.py +++ b/tests/auth/test_jwt.py @@ -13,42 +13,44 @@ from wikijs.exceptions import AuthenticationError class TestJWTAuth: """Test JWTAuth implementation.""" - def test_init_with_valid_token(self, mock_jwt_token): + def test_init_with_valid_token(self, mock_jwt_token, mock_wiki_base_url): """Test initialization with valid JWT token.""" - auth = JWTAuth(mock_jwt_token) + auth = JWTAuth(mock_jwt_token, mock_wiki_base_url) assert auth._token == mock_jwt_token + assert auth._base_url == mock_wiki_base_url assert auth._refresh_token is None assert auth._expires_at is None - def test_init_with_all_parameters(self, mock_jwt_token): + def test_init_with_all_parameters(self, mock_jwt_token, mock_wiki_base_url): """Test initialization with all parameters.""" refresh_token = "refresh-token-123" expires_at = time.time() + 3600 - auth = JWTAuth(mock_jwt_token, refresh_token, expires_at) + auth = JWTAuth(mock_jwt_token, mock_wiki_base_url, refresh_token, expires_at) assert auth._token == mock_jwt_token + assert auth._base_url == mock_wiki_base_url assert auth._refresh_token == refresh_token assert auth._expires_at == expires_at - def test_init_with_whitespace_token(self): + def test_init_with_whitespace_token(self, mock_wiki_base_url): """Test initialization trims whitespace from token.""" - auth = JWTAuth(" test-token ") + auth = JWTAuth(" test-token ", mock_wiki_base_url) assert auth._token == "test-token" - def test_init_with_empty_token_raises_error(self): + def test_init_with_empty_token_raises_error(self, mock_wiki_base_url): """Test that empty JWT token raises ValueError.""" with pytest.raises(ValueError, match="JWT token cannot be empty"): - JWTAuth("") + JWTAuth("", mock_wiki_base_url) - def test_init_with_whitespace_only_token_raises_error(self): + def test_init_with_whitespace_only_token_raises_error(self, mock_wiki_base_url): """Test that whitespace-only JWT token raises ValueError.""" with pytest.raises(ValueError, match="JWT token cannot be empty"): - JWTAuth(" ") + JWTAuth(" ", mock_wiki_base_url) - def test_init_with_none_raises_error(self): + def test_init_with_none_raises_error(self, mock_wiki_base_url): """Test that None JWT token raises ValueError.""" with pytest.raises(ValueError, match="JWT token cannot be empty"): - JWTAuth(None) + JWTAuth(None, mock_wiki_base_url) def test_get_headers_returns_bearer_token(self, jwt_auth, mock_jwt_token): """Test that get_headers returns proper Authorization header.""" @@ -60,17 +62,17 @@ class TestJWTAuth: } assert headers == expected_headers - def test_get_headers_attempts_refresh_if_invalid(self, mock_jwt_token): + def test_get_headers_attempts_refresh_if_invalid(self, mock_jwt_token, mock_wiki_base_url): """Test that get_headers attempts refresh if token is invalid.""" # Create JWT with expired token expires_at = time.time() - 3600 # Expired 1 hour ago refresh_token = "refresh-token-123" - auth = JWTAuth(mock_jwt_token, refresh_token, expires_at) + auth = JWTAuth(mock_jwt_token, mock_wiki_base_url, refresh_token, expires_at) # Mock the refresh method to avoid actual implementation with patch.object(auth, "refresh") as mock_refresh: - mock_refresh.side_effect = AuthenticationError("Refresh not implemented") + mock_refresh.side_effect = AuthenticationError("Refresh failed") with pytest.raises(AuthenticationError): auth.get_headers() @@ -81,28 +83,28 @@ class TestJWTAuth: """Test that is_valid returns True for valid token without expiry.""" assert jwt_auth.is_valid() is True - def test_is_valid_returns_true_for_non_expired_token(self, mock_jwt_token): + def test_is_valid_returns_true_for_non_expired_token(self, mock_jwt_token, mock_wiki_base_url): """Test that is_valid returns True for non-expired token.""" expires_at = time.time() + 3600 # Expires in 1 hour - auth = JWTAuth(mock_jwt_token, expires_at=expires_at) + auth = JWTAuth(mock_jwt_token, mock_wiki_base_url, expires_at=expires_at) assert auth.is_valid() is True - def test_is_valid_returns_false_for_expired_token(self, mock_jwt_token): + def test_is_valid_returns_false_for_expired_token(self, mock_jwt_token, mock_wiki_base_url): """Test that is_valid returns False for expired token.""" expires_at = time.time() - 3600 # Expired 1 hour ago - auth = JWTAuth(mock_jwt_token, expires_at=expires_at) + auth = JWTAuth(mock_jwt_token, mock_wiki_base_url, expires_at=expires_at) assert auth.is_valid() is False - def test_is_valid_considers_refresh_buffer(self, mock_jwt_token): + def test_is_valid_considers_refresh_buffer(self, mock_jwt_token, mock_wiki_base_url): """Test that is_valid considers refresh buffer.""" # Token expires in 4 minutes (less than 5 minute buffer) expires_at = time.time() + 240 - auth = JWTAuth(mock_jwt_token, expires_at=expires_at) + auth = JWTAuth(mock_jwt_token, mock_wiki_base_url, expires_at=expires_at) assert auth.is_valid() is False # Should be invalid due to buffer - def test_is_valid_returns_false_for_empty_token(self, mock_jwt_token): + def test_is_valid_returns_false_for_empty_token(self, mock_jwt_token, mock_wiki_base_url): """Test that is_valid handles edge cases.""" - auth = JWTAuth(mock_jwt_token) + auth = JWTAuth(mock_jwt_token, mock_wiki_base_url) auth._token = "" assert auth.is_valid() is False @@ -117,57 +119,60 @@ class TestJWTAuth: ): jwt_auth.refresh() - def test_refresh_raises_not_implemented_error(self, mock_jwt_token): - """Test that refresh raises not implemented error.""" + def test_refresh_with_refresh_token(self, mock_jwt_token, mock_wiki_base_url): + """Test that refresh is now implemented with refresh token.""" refresh_token = "refresh-token-123" - auth = JWTAuth(mock_jwt_token, refresh_token) + auth = JWTAuth(mock_jwt_token, mock_wiki_base_url, refresh_token) - with pytest.raises( - AuthenticationError, match="JWT token refresh not yet implemented" - ): - auth.refresh() + # Mock the requests.post call + with patch("requests.post") as mock_post: + # Simulate failed refresh (network issue) + mock_post.side_effect = Exception("Network error") + + with pytest.raises(AuthenticationError, match="Unexpected error during token refresh"): + auth.refresh() def test_is_expired_returns_false_no_expiry(self, jwt_auth): """Test that is_expired returns False when no expiry set.""" assert jwt_auth.is_expired() is False - def test_is_expired_returns_false_for_valid_token(self, mock_jwt_token): + def test_is_expired_returns_false_for_valid_token(self, mock_jwt_token, mock_wiki_base_url): """Test that is_expired returns False for valid token.""" expires_at = time.time() + 3600 # Expires in 1 hour - auth = JWTAuth(mock_jwt_token, expires_at=expires_at) + auth = JWTAuth(mock_jwt_token, mock_wiki_base_url, expires_at=expires_at) assert auth.is_expired() is False - def test_is_expired_returns_true_for_expired_token(self, mock_jwt_token): + def test_is_expired_returns_true_for_expired_token(self, mock_jwt_token, mock_wiki_base_url): """Test that is_expired returns True for expired token.""" expires_at = time.time() - 3600 # Expired 1 hour ago - auth = JWTAuth(mock_jwt_token, expires_at=expires_at) + auth = JWTAuth(mock_jwt_token, mock_wiki_base_url, expires_at=expires_at) assert auth.is_expired() is True def test_time_until_expiry_returns_none_no_expiry(self, jwt_auth): """Test that time_until_expiry returns None when no expiry set.""" assert jwt_auth.time_until_expiry() is None - def test_time_until_expiry_returns_correct_delta(self, mock_jwt_token): + def test_time_until_expiry_returns_correct_delta(self, mock_jwt_token, mock_wiki_base_url): """Test that time_until_expiry returns correct timedelta.""" expires_at = time.time() + 3600 # Expires in 1 hour - auth = JWTAuth(mock_jwt_token, expires_at=expires_at) + auth = JWTAuth(mock_jwt_token, mock_wiki_base_url, expires_at=expires_at) time_left = auth.time_until_expiry() assert isinstance(time_left, timedelta) # Should be approximately 1 hour (allowing for small time differences) assert 3550 <= time_left.total_seconds() <= 3600 - def test_time_until_expiry_returns_zero_for_expired(self, mock_jwt_token): + def test_time_until_expiry_returns_zero_for_expired(self, mock_jwt_token, mock_wiki_base_url): """Test that time_until_expiry returns zero for expired token.""" expires_at = time.time() - 3600 # Expired 1 hour ago - auth = JWTAuth(mock_jwt_token, expires_at=expires_at) + auth = JWTAuth(mock_jwt_token, mock_wiki_base_url, expires_at=expires_at) time_left = auth.time_until_expiry() assert time_left.total_seconds() == 0 - def test_token_preview_masks_token(self, mock_jwt_token): + def test_token_preview_masks_token(self, mock_jwt_token, mock_wiki_base_url): """Test that token_preview masks the token for security.""" - auth = JWTAuth(mock_jwt_token) + auth = JWTAuth(mock_jwt_token, mock_wiki_base_url) preview = auth.token_preview assert preview != mock_jwt_token # Should not show full token @@ -175,25 +180,25 @@ class TestJWTAuth: assert preview.endswith(mock_jwt_token[-10:]) assert "..." in preview - def test_token_preview_handles_short_token(self): + def test_token_preview_handles_short_token(self, mock_wiki_base_url): """Test that token_preview handles short tokens.""" short_token = "short" - auth = JWTAuth(short_token) + auth = JWTAuth(short_token, mock_wiki_base_url) preview = auth.token_preview assert preview == "*****" # Should be all asterisks - def test_token_preview_handles_none_token(self, mock_jwt_token): + def test_token_preview_handles_none_token(self, mock_jwt_token, mock_wiki_base_url): """Test that token_preview handles None token.""" - auth = JWTAuth(mock_jwt_token) + auth = JWTAuth(mock_jwt_token, mock_wiki_base_url) auth._token = None assert auth.token_preview == "None" - def test_repr_shows_masked_token(self, mock_jwt_token): + def test_repr_shows_masked_token(self, mock_jwt_token, mock_wiki_base_url): """Test that __repr__ shows masked token.""" expires_at = time.time() + 3600 - auth = JWTAuth(mock_jwt_token, expires_at=expires_at) + auth = JWTAuth(mock_jwt_token, mock_wiki_base_url, expires_at=expires_at) repr_str = repr(auth) assert "JWTAuth" in repr_str @@ -207,12 +212,12 @@ class TestJWTAuth: jwt_auth.validate_credentials() assert jwt_auth.is_valid() is True - def test_refresh_token_whitespace_handling(self, mock_jwt_token): + def test_refresh_token_whitespace_handling(self, mock_jwt_token, mock_wiki_base_url): """Test that refresh token whitespace is handled correctly.""" refresh_token = " refresh-token-123 " - auth = JWTAuth(mock_jwt_token, refresh_token) + auth = JWTAuth(mock_jwt_token, mock_wiki_base_url, refresh_token) assert auth._refresh_token == "refresh-token-123" # Test None refresh token - auth = JWTAuth(mock_jwt_token, None) + auth = JWTAuth(mock_jwt_token, mock_wiki_base_url, None) assert auth._refresh_token is None diff --git a/tests/conftest.py b/tests/conftest.py index 310d551..439cdd2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -25,9 +25,9 @@ def api_key_auth(mock_api_key): @pytest.fixture -def jwt_auth(mock_jwt_token): +def jwt_auth(mock_jwt_token, mock_wiki_base_url): """Fixture providing JWTAuth instance.""" - return JWTAuth(mock_jwt_token) + return JWTAuth(mock_jwt_token, mock_wiki_base_url) @pytest.fixture diff --git a/tests/test_client.py b/tests/test_client.py index 10788bd..9a1fcaa 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -81,40 +81,50 @@ class TestWikiJSClientTestConnection: """Mock API key.""" return "test-api-key-12345" - @patch("wikijs.client.requests.Session.get") - def test_test_connection_success(self, mock_get, mock_wiki_base_url, mock_api_key): - """Test successful connection test.""" + @patch("wikijs.client.requests.Session.request") + def test_test_connection_success(self, mock_request, mock_wiki_base_url, mock_api_key): + """Test successful connection test using GraphQL query.""" mock_response = Mock() + mock_response.ok = True mock_response.status_code = 200 - mock_get.return_value = mock_response + mock_response.json.return_value = { + "data": { + "site": { + "title": "Test Wiki" + } + } + } + mock_request.return_value = mock_response client = WikiJSClient(mock_wiki_base_url, auth=mock_api_key) result = client.test_connection() assert result is True + # Verify it made a POST request to GraphQL endpoint + mock_request.assert_called_once() - @patch("wikijs.client.requests.Session.get") - def test_test_connection_timeout(self, mock_get, mock_wiki_base_url, mock_api_key): + @patch("wikijs.client.requests.Session.request") + def test_test_connection_timeout(self, mock_request, mock_wiki_base_url, mock_api_key): """Test connection test timeout.""" import requests - mock_get.side_effect = requests.exceptions.Timeout("Request timed out") + mock_request.side_effect = requests.exceptions.Timeout("Request timed out") client = WikiJSClient(mock_wiki_base_url, auth=mock_api_key) - with pytest.raises(TimeoutError, match="Connection test timed out"): + with pytest.raises(TimeoutError): client.test_connection() - @patch("wikijs.client.requests.Session.get") - def test_test_connection_error(self, mock_get, mock_wiki_base_url, mock_api_key): + @patch("wikijs.client.requests.Session.request") + def test_test_connection_error(self, mock_request, mock_wiki_base_url, mock_api_key): """Test connection test with connection error.""" import requests - mock_get.side_effect = requests.exceptions.ConnectionError("Connection failed") + mock_request.side_effect = requests.exceptions.ConnectionError("Connection failed") client = WikiJSClient(mock_wiki_base_url, auth=mock_api_key) - with pytest.raises(ConnectionError, match="Cannot connect"): + with pytest.raises(ConnectionError): client.test_connection() def test_test_connection_no_base_url(self): @@ -336,7 +346,7 @@ class TestWikiJSClientContextManager: mock_session_class.return_value = mock_session # Mock generic exception during connection test - mock_session.get.side_effect = RuntimeError("Unexpected error") + mock_session.request.side_effect = RuntimeError("Unexpected error") client = WikiJSClient("https://wiki.example.com", auth="test-key") diff --git a/wikijs/auth/jwt.py b/wikijs/auth/jwt.py index 2fdd236..e9fc17f 100644 --- a/wikijs/auth/jwt.py +++ b/wikijs/auth/jwt.py @@ -19,17 +19,23 @@ class JWTAuth(AuthHandler): Args: token: The JWT token string. + base_url: The base URL of the Wiki.js instance (needed for token refresh). refresh_token: Optional refresh token for automatic renewal. expires_at: Optional expiration timestamp (Unix timestamp). Example: - >>> auth = JWTAuth("eyJ0eXAiOiJKV1QiLCJhbGc...") + >>> auth = JWTAuth( + ... token="eyJ0eXAiOiJKV1QiLCJhbGc...", + ... base_url="https://wiki.example.com", + ... refresh_token="refresh_token_here" + ... ) >>> client = WikiJSClient("https://wiki.example.com", auth=auth) """ def __init__( self, token: str, + base_url: str, refresh_token: Optional[str] = None, expires_at: Optional[float] = None, ) -> None: @@ -37,16 +43,21 @@ class JWTAuth(AuthHandler): Args: token: The JWT token string. + base_url: The base URL of the Wiki.js instance. refresh_token: Optional refresh token for automatic renewal. expires_at: Optional expiration timestamp (Unix timestamp). Raises: - ValueError: If token is empty or None. + ValueError: If token or base_url is empty or None. """ if not token or not token.strip(): raise ValueError("JWT token cannot be empty") + if not base_url or not base_url.strip(): + raise ValueError("Base URL cannot be empty") + self._token = token.strip() + self._base_url = base_url.strip().rstrip("/") self._refresh_token = refresh_token.strip() if refresh_token else None self._expires_at = expires_at self._refresh_buffer = 300 # Refresh 5 minutes before expiration @@ -91,15 +102,13 @@ class JWTAuth(AuthHandler): def refresh(self) -> None: """Refresh the JWT token using the refresh token. - This method attempts to refresh the JWT token using the refresh token. - If no refresh token is available, it raises an AuthenticationError. - - Note: This is a placeholder implementation. In a real implementation, - this would make an HTTP request to the Wiki.js token refresh endpoint. + This method attempts to refresh the JWT token using the refresh token + by making a request to the Wiki.js authentication endpoint. Raises: AuthenticationError: If refresh token is not available or refresh fails. """ + import requests from ..exceptions import AuthenticationError if not self._refresh_token: @@ -107,16 +116,57 @@ class JWTAuth(AuthHandler): "JWT token expired and no refresh token available" ) - # TODO: Implement actual token refresh logic - # This would typically involve: - # 1. Making a POST request to /auth/refresh endpoint - # 2. Sending the refresh token - # 3. Updating self._token and self._expires_at with the response + try: + # Make request to Wiki.js token refresh endpoint + refresh_url = f"{self._base_url}/api/auth/refresh" - raise AuthenticationError( - "JWT token refresh not yet implemented. " - "Please provide a new token or use API key authentication." - ) + response = requests.post( + refresh_url, + json={"refreshToken": self._refresh_token}, + headers={ + "Content-Type": "application/json", + "Accept": "application/json", + }, + timeout=30, + ) + + # Check if request was successful + if not response.ok: + error_msg = "Token refresh failed" + try: + error_data = response.json() + error_msg = error_data.get("message", error_msg) + except Exception: + pass + + raise AuthenticationError( + f"Failed to refresh JWT token: {error_msg} (HTTP {response.status_code})" + ) + + # Parse response + data = response.json() + + # Update token and expiration + if "token" in data: + self._token = data["token"] + + if "expiresAt" in data: + self._expires_at = data["expiresAt"] + elif "expires_at" in data: + self._expires_at = data["expires_at"] + + # Optionally update refresh token if a new one is provided + if "refreshToken" in data: + self._refresh_token = data["refreshToken"] + + except requests.exceptions.RequestException as e: + raise AuthenticationError( + f"Failed to refresh JWT token: {str(e)}" + ) from e + except Exception as e: + raise AuthenticationError( + f"Unexpected error during token refresh: {str(e)}" + ) from e def is_expired(self) -> bool: """Check if the JWT token is expired. diff --git a/wikijs/client.py b/wikijs/client.py index 144c1bb..e8164f1 100644 --- a/wikijs/client.py +++ b/wikijs/client.py @@ -23,6 +23,7 @@ from .utils import ( normalize_url, parse_wiki_response, ) +from .version import __version__ class WikiJSClient: @@ -82,7 +83,7 @@ class WikiJSClient: # Request configuration self.timeout = timeout self.verify_ssl = verify_ssl - self.user_agent = user_agent or "wikijs-python-sdk/0.1.0" + self.user_agent = user_agent or f"wikijs-python-sdk/{__version__}" # Initialize HTTP session self._session = self._create_session() @@ -229,6 +230,9 @@ class WikiJSClient: def test_connection(self) -> bool: """Test connection to Wiki.js instance. + This method validates the connection by making an actual GraphQL query + to the Wiki.js API, ensuring both connectivity and authentication work. + Returns: True if connection successful @@ -236,6 +240,7 @@ class WikiJSClient: ConfigurationError: If client is not properly configured ConnectionError: If cannot connect to server AuthenticationError: If authentication fails + TimeoutError: If connection test times out """ if not self.base_url: raise ConfigurationError("Base URL not configured") @@ -244,20 +249,47 @@ class WikiJSClient: raise ConfigurationError("Authentication not configured") try: - # Try to hit a basic endpoint (will implement with actual endpoints) - # For now, just test basic connectivity - self._session.get( - self.base_url, timeout=self.timeout, verify=self.verify_ssl - ) + # Test with minimal GraphQL query to validate API access + query = """ + query { + site { + title + } + } + """ + + response = self._request("POST", "/graphql", json_data={"query": query}) + + # Check for GraphQL errors + if "errors" in response: + error_msg = response["errors"][0].get("message", "Unknown error") + raise AuthenticationError( + f"GraphQL query failed: {error_msg}" + ) + + # Verify we got expected data structure + if "data" not in response or "site" not in response["data"]: + raise APIError( + "Unexpected response format from Wiki.js API" + ) + return True - except requests.exceptions.Timeout: - raise TimeoutError( - f"Connection test timed out after {self.timeout} seconds" - ) + except AuthenticationError: + # Re-raise authentication errors as-is + raise - except requests.exceptions.ConnectionError as e: - raise ConnectionError(f"Cannot connect to {self.base_url}: {str(e)}") + except TimeoutError: + # Re-raise timeout errors as-is + raise + + except ConnectionError: + # Re-raise connection errors as-is + raise + + except APIError: + # Re-raise API errors as-is + raise except Exception as e: raise ConnectionError(f"Connection test failed: {str(e)}") diff --git a/wikijs/exceptions.py b/wikijs/exceptions.py index 9812bc3..e383f4b 100644 --- a/wikijs/exceptions.py +++ b/wikijs/exceptions.py @@ -72,13 +72,19 @@ class RateLimitError(ClientError): self.retry_after = retry_after -class ConnectionError(WikiJSException): +class ConnectionError(APIError): """Raised when there's a connection issue.""" + def __init__(self, message: str, details: Optional[Dict[str, Any]] = None) -> None: + super().__init__(message, status_code=None, response=None, details=details) -class TimeoutError(WikiJSException): + +class TimeoutError(APIError): """Raised when a request times out.""" + def __init__(self, message: str, details: Optional[Dict[str, Any]] = None) -> None: + super().__init__(message, status_code=None, response=None, details=details) + def create_api_error(status_code: int, message: str, response: Any = None) -> APIError: """Create appropriate API error based on status code.