"""JWT token authentication for wikijs-python-sdk. This module implements JWT (JSON Web Token) authentication for Wiki.js instances. JWT tokens are typically used for user-based authentication and have expiration times. """ import time from datetime import timedelta from typing import Dict, Optional from .base import AuthHandler class JWTAuth(AuthHandler): """JWT token authentication handler for Wiki.js. This handler manages JWT tokens with automatic refresh capabilities. JWT tokens typically expire and need to be refreshed periodically. Args: token: The JWT token string. base_url: The base URL of the Wiki.js instance (needed for token refresh). refresh_token: Optional refresh token for automatic renewal. expires_at: Optional expiration timestamp (Unix timestamp). Example: >>> auth = JWTAuth( ... token="eyJ0eXAiOiJKV1QiLCJhbGc...", ... base_url="https://wiki.example.com", ... refresh_token="refresh_token_here" ... ) >>> client = WikiJSClient("https://wiki.example.com", auth=auth) """ def __init__( self, token: str, base_url: str, refresh_token: Optional[str] = None, expires_at: Optional[float] = None, ) -> None: """Initialize JWT authentication. Args: token: The JWT token string. base_url: The base URL of the Wiki.js instance. refresh_token: Optional refresh token for automatic renewal. expires_at: Optional expiration timestamp (Unix timestamp). Raises: ValueError: If token or base_url is empty or None. """ if not token or not token.strip(): raise ValueError("JWT token cannot be empty") if not base_url or not base_url.strip(): raise ValueError("Base URL cannot be empty") self._token = token.strip() self._base_url = base_url.strip().rstrip("/") self._refresh_token = refresh_token.strip() if refresh_token else None self._expires_at = expires_at self._refresh_buffer = 300 # Refresh 5 minutes before expiration def get_headers(self) -> Dict[str, str]: """Get authentication headers with JWT token. Automatically attempts to refresh the token if it's expired. Returns: Dict[str, str]: Headers containing the Authorization header. Raises: AuthenticationError: If token is expired and cannot be refreshed. """ # Try to refresh if token is near expiration if not self.is_valid(): self.refresh() return { "Authorization": f"Bearer {self._token}", "Content-Type": "application/json", } def is_valid(self) -> bool: """Check if JWT token is valid and not expired. Returns: bool: True if token exists and is not expired, False otherwise. """ if not self._token or not self._token.strip(): return False # If no expiration time is set, assume token is valid if self._expires_at is None: return True # Check if token is expired (with buffer for refresh) current_time = time.time() return current_time < (self._expires_at - self._refresh_buffer) def refresh(self) -> None: """Refresh the JWT token using the refresh token. This method attempts to refresh the JWT token using the refresh token by making a request to the Wiki.js authentication endpoint. Raises: AuthenticationError: If refresh token is not available or refresh fails. """ import requests from ..exceptions import AuthenticationError if not self._refresh_token: raise AuthenticationError( "JWT token expired and no refresh token available" ) try: # Make request to Wiki.js token refresh endpoint refresh_url = f"{self._base_url}/api/auth/refresh" response = requests.post( refresh_url, json={"refreshToken": self._refresh_token}, headers={ "Content-Type": "application/json", "Accept": "application/json", }, timeout=30, ) # Check if request was successful if not response.ok: error_msg = "Token refresh failed" try: error_data = response.json() error_msg = error_data.get("message", error_msg) except Exception: pass raise AuthenticationError( f"Failed to refresh JWT token: {error_msg} (HTTP {response.status_code})" ) # Parse response data = response.json() # Update token and expiration if "token" in data: self._token = data["token"] if "expiresAt" in data: self._expires_at = data["expiresAt"] elif "expires_at" in data: self._expires_at = data["expires_at"] # Optionally update refresh token if a new one is provided if "refreshToken" in data: self._refresh_token = data["refreshToken"] except requests.exceptions.RequestException as e: raise AuthenticationError( f"Failed to refresh JWT token: {str(e)}" ) from e except Exception as e: raise AuthenticationError( f"Unexpected error during token refresh: {str(e)}" ) from e def is_expired(self) -> bool: """Check if the JWT token is expired. Returns: bool: True if token is expired, False otherwise. """ if self._expires_at is None: return False return time.time() >= self._expires_at def time_until_expiry(self) -> Optional[timedelta]: """Get time until token expires. Returns: Optional[timedelta]: Time until expiration, or None if no expiration set. """ if self._expires_at is None: return None remaining_seconds = self._expires_at - time.time() return timedelta(seconds=max(0, remaining_seconds)) @property def token_preview(self) -> str: """Get a preview of the JWT token for logging/debugging. Returns: str: Masked token showing only first and last few characters. """ if not self._token: return "None" if len(self._token) <= 20: return "*" * len(self._token) return f"{self._token[:10]}...{self._token[-10:]}" def __repr__(self) -> str: """String representation of the auth handler. Returns: str: Safe representation with masked token. """ return f"JWTAuth(token='{self.token_preview}', expires_at={self._expires_at})"