**Critical Fixes:**
1. **Fixed Error Hierarchy** (wikijs/exceptions.py)
- ConnectionError and TimeoutError now properly inherit from APIError
- Ensures consistent exception handling across the SDK
- Added proper __init__ methods with status_code=None
2. **Fixed test_connection Method** (wikijs/client.py)
- Changed from basic HTTP GET to proper GraphQL query validation
- Now uses query { site { title } } to validate API connectivity
- Provides better error messages for authentication failures
- Validates both connectivity AND API access
3. **Implemented JWT Token Refresh** (wikijs/auth/jwt.py)
- Added base_url parameter to JWTAuth class
- Implemented complete refresh() method with HTTP request to /api/auth/refresh
- Handles token, refresh token, and expiration updates
- Proper error handling for network failures and auth errors
**Bonus Fixes:**
- Dynamic user agent version (uses __version__ from version.py instead of hardcoded)
- Updated all JWT tests to include required base_url parameter
- Updated test mocks to match new GraphQL-based test_connection
**Test Results:**
- All 231 tests passing ✅
- Test coverage: 91.64% (target: 85%) ✅
- No test failures or errors
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
216 lines
6.9 KiB
Python
216 lines
6.9 KiB
Python
"""JWT token authentication for wikijs-python-sdk.
|
|
|
|
This module implements JWT (JSON Web Token) authentication for Wiki.js instances.
|
|
JWT tokens are typically used for user-based authentication and have expiration times.
|
|
"""
|
|
|
|
import time
|
|
from datetime import timedelta
|
|
from typing import Dict, Optional
|
|
|
|
from .base import AuthHandler
|
|
|
|
|
|
class JWTAuth(AuthHandler):
|
|
"""JWT token authentication handler for Wiki.js.
|
|
|
|
This handler manages JWT tokens with automatic refresh capabilities.
|
|
JWT tokens typically expire and need to be refreshed periodically.
|
|
|
|
Args:
|
|
token: The JWT token string.
|
|
base_url: The base URL of the Wiki.js instance (needed for token refresh).
|
|
refresh_token: Optional refresh token for automatic renewal.
|
|
expires_at: Optional expiration timestamp (Unix timestamp).
|
|
|
|
Example:
|
|
>>> auth = JWTAuth(
|
|
... token="eyJ0eXAiOiJKV1QiLCJhbGc...",
|
|
... base_url="https://wiki.example.com",
|
|
... refresh_token="refresh_token_here"
|
|
... )
|
|
>>> client = WikiJSClient("https://wiki.example.com", auth=auth)
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
token: str,
|
|
base_url: str,
|
|
refresh_token: Optional[str] = None,
|
|
expires_at: Optional[float] = None,
|
|
) -> None:
|
|
"""Initialize JWT authentication.
|
|
|
|
Args:
|
|
token: The JWT token string.
|
|
base_url: The base URL of the Wiki.js instance.
|
|
refresh_token: Optional refresh token for automatic renewal.
|
|
expires_at: Optional expiration timestamp (Unix timestamp).
|
|
|
|
Raises:
|
|
ValueError: If token or base_url is empty or None.
|
|
"""
|
|
if not token or not token.strip():
|
|
raise ValueError("JWT token cannot be empty")
|
|
|
|
if not base_url or not base_url.strip():
|
|
raise ValueError("Base URL cannot be empty")
|
|
|
|
self._token = token.strip()
|
|
self._base_url = base_url.strip().rstrip("/")
|
|
self._refresh_token = refresh_token.strip() if refresh_token else None
|
|
self._expires_at = expires_at
|
|
self._refresh_buffer = 300 # Refresh 5 minutes before expiration
|
|
|
|
def get_headers(self) -> Dict[str, str]:
|
|
"""Get authentication headers with JWT token.
|
|
|
|
Automatically attempts to refresh the token if it's expired.
|
|
|
|
Returns:
|
|
Dict[str, str]: Headers containing the Authorization header.
|
|
|
|
Raises:
|
|
AuthenticationError: If token is expired and cannot be refreshed.
|
|
"""
|
|
# Try to refresh if token is near expiration
|
|
if not self.is_valid():
|
|
self.refresh()
|
|
|
|
return {
|
|
"Authorization": f"Bearer {self._token}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
def is_valid(self) -> bool:
|
|
"""Check if JWT token is valid and not expired.
|
|
|
|
Returns:
|
|
bool: True if token exists and is not expired, False otherwise.
|
|
"""
|
|
if not self._token or not self._token.strip():
|
|
return False
|
|
|
|
# If no expiration time is set, assume token is valid
|
|
if self._expires_at is None:
|
|
return True
|
|
|
|
# Check if token is expired (with buffer for refresh)
|
|
current_time = time.time()
|
|
return current_time < (self._expires_at - self._refresh_buffer)
|
|
|
|
def refresh(self) -> None:
|
|
"""Refresh the JWT token using the refresh token.
|
|
|
|
This method attempts to refresh the JWT token using the refresh token
|
|
by making a request to the Wiki.js authentication endpoint.
|
|
|
|
Raises:
|
|
AuthenticationError: If refresh token is not available or refresh fails.
|
|
"""
|
|
import requests
|
|
from ..exceptions import AuthenticationError
|
|
|
|
if not self._refresh_token:
|
|
raise AuthenticationError(
|
|
"JWT token expired and no refresh token available"
|
|
)
|
|
|
|
try:
|
|
# Make request to Wiki.js token refresh endpoint
|
|
refresh_url = f"{self._base_url}/api/auth/refresh"
|
|
|
|
response = requests.post(
|
|
refresh_url,
|
|
json={"refreshToken": self._refresh_token},
|
|
headers={
|
|
"Content-Type": "application/json",
|
|
"Accept": "application/json",
|
|
},
|
|
timeout=30,
|
|
)
|
|
|
|
# Check if request was successful
|
|
if not response.ok:
|
|
error_msg = "Token refresh failed"
|
|
try:
|
|
error_data = response.json()
|
|
error_msg = error_data.get("message", error_msg)
|
|
except Exception:
|
|
pass
|
|
|
|
raise AuthenticationError(
|
|
f"Failed to refresh JWT token: {error_msg} (HTTP {response.status_code})"
|
|
)
|
|
|
|
# Parse response
|
|
data = response.json()
|
|
|
|
# Update token and expiration
|
|
if "token" in data:
|
|
self._token = data["token"]
|
|
|
|
if "expiresAt" in data:
|
|
self._expires_at = data["expiresAt"]
|
|
elif "expires_at" in data:
|
|
self._expires_at = data["expires_at"]
|
|
|
|
# Optionally update refresh token if a new one is provided
|
|
if "refreshToken" in data:
|
|
self._refresh_token = data["refreshToken"]
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
raise AuthenticationError(
|
|
f"Failed to refresh JWT token: {str(e)}"
|
|
) from e
|
|
except Exception as e:
|
|
raise AuthenticationError(
|
|
f"Unexpected error during token refresh: {str(e)}"
|
|
) from e
|
|
|
|
def is_expired(self) -> bool:
|
|
"""Check if the JWT token is expired.
|
|
|
|
Returns:
|
|
bool: True if token is expired, False otherwise.
|
|
"""
|
|
if self._expires_at is None:
|
|
return False
|
|
|
|
return time.time() >= self._expires_at
|
|
|
|
def time_until_expiry(self) -> Optional[timedelta]:
|
|
"""Get time until token expires.
|
|
|
|
Returns:
|
|
Optional[timedelta]: Time until expiration, or None if no expiration set.
|
|
"""
|
|
if self._expires_at is None:
|
|
return None
|
|
|
|
remaining_seconds = self._expires_at - time.time()
|
|
return timedelta(seconds=max(0, remaining_seconds))
|
|
|
|
@property
|
|
def token_preview(self) -> str:
|
|
"""Get a preview of the JWT token for logging/debugging.
|
|
|
|
Returns:
|
|
str: Masked token showing only first and last few characters.
|
|
"""
|
|
if not self._token:
|
|
return "None"
|
|
|
|
if len(self._token) <= 20:
|
|
return "*" * len(self._token)
|
|
|
|
return f"{self._token[:10]}...{self._token[-10:]}"
|
|
|
|
def __repr__(self) -> str:
|
|
"""String representation of the auth handler.
|
|
|
|
Returns:
|
|
str: Safe representation with masked token.
|
|
"""
|
|
return f"JWTAuth(token='{self.token_preview}', expires_at={self._expires_at})"
|