Fix code formatting and linting issues

- Updated GitHub Actions workflow to use correct flake8 configuration
- Fixed line length issues by using 88 characters as configured
- Removed unused imports and trailing whitespace
- Fixed f-string placeholders and unused variables
- All linting checks now pass with project configuration

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-07-30 20:49:40 -04:00
parent 16bd151337
commit ade9aacf56
33 changed files with 1099 additions and 1096 deletions

View File

@@ -5,11 +5,11 @@ instances, including support for pages, users, groups, and system management.
Example:
Basic usage:
>>> from wikijs import WikiJSClient
>>> client = WikiJSClient('https://wiki.example.com', auth='your-api-key')
>>> # API endpoints will be available as development progresses
Features:
- Type-safe data models with validation
- Comprehensive error handling
@@ -18,21 +18,21 @@ Features:
- Context manager support for resource cleanup
"""
from .auth import AuthHandler, NoAuth, APIKeyAuth, JWTAuth
from .auth import APIKeyAuth, AuthHandler, JWTAuth, NoAuth
from .client import WikiJSClient
from .exceptions import (
WikiJSException,
APIError,
AuthenticationError,
ConfigurationError,
ValidationError,
ClientError,
ServerError,
ConfigurationError,
ConnectionError,
NotFoundError,
PermissionError,
RateLimitError,
ConnectionError,
ServerError,
TimeoutError,
ValidationError,
WikiJSException,
)
from .models import BaseModel, Page, PageCreate, PageUpdate
from .version import __version__, __version_info__
@@ -41,33 +41,29 @@ from .version import __version__, __version_info__
__all__ = [
# Main client
"WikiJSClient",
# Authentication
"AuthHandler",
"NoAuth",
"APIKeyAuth",
"APIKeyAuth",
"JWTAuth",
# Data models
"BaseModel",
"Page",
"Page",
"PageCreate",
"PageUpdate",
# Exceptions
"WikiJSException",
"APIError",
"AuthenticationError",
"AuthenticationError",
"ConfigurationError",
"ValidationError",
"ClientError",
"ServerError",
"ServerError",
"NotFoundError",
"PermissionError",
"RateLimitError",
"ConnectionError",
"TimeoutError",
# Version info
"__version__",
"__version_info__",
@@ -81,4 +77,10 @@ __description__ = "Professional Python SDK for Wiki.js API integration"
__url__ = "https://github.com/yourusername/wikijs-python-sdk"
# For type checking
__all__ += ["__author__", "__email__", "__license__", "__description__", "__url__"]
__all__ += [
"__author__",
"__email__",
"__license__",
"__description__",
"__url__",
]

View File

@@ -9,13 +9,13 @@ Supported authentication methods:
- No authentication for testing (NoAuth)
"""
from .base import AuthHandler, NoAuth
from .api_key import APIKeyAuth
from .base import AuthHandler, NoAuth
from .jwt import JWTAuth
__all__ = [
"AuthHandler",
"NoAuth",
"NoAuth",
"APIKeyAuth",
"JWTAuth",
]
]

View File

@@ -4,20 +4,20 @@ This module implements API key authentication for Wiki.js instances.
API keys are typically used for server-to-server authentication.
"""
from typing import Dict, Optional
from typing import Dict
from .base import AuthHandler
class APIKeyAuth(AuthHandler):
"""API key authentication handler for Wiki.js.
This handler implements authentication using an API key, which is
included in the Authorization header as a Bearer token.
Args:
api_key: The API key string from Wiki.js admin panel.
Example:
>>> auth = APIKeyAuth("your-api-key-here")
>>> client = WikiJSClient("https://wiki.example.com", auth=auth)
@@ -25,35 +25,35 @@ class APIKeyAuth(AuthHandler):
def __init__(self, api_key: str) -> None:
"""Initialize API key authentication.
Args:
api_key: The API key from Wiki.js admin panel.
Raises:
ValueError: If api_key is empty or None.
"""
if not api_key or not api_key.strip():
raise ValueError("API key cannot be empty")
self._api_key = api_key.strip()
def get_headers(self) -> Dict[str, str]:
"""Get authentication headers with API key.
Returns:
Dict[str, str]: Headers containing the Authorization header.
"""
return {
"Authorization": f"Bearer {self._api_key}",
"Content-Type": "application/json"
"Content-Type": "application/json",
}
def is_valid(self) -> bool:
"""Check if API key is valid.
For API keys, we assume they're valid if they're not empty.
Actual validation happens on the server side.
Returns:
bool: True if API key exists, False otherwise.
"""
@@ -61,29 +61,30 @@ class APIKeyAuth(AuthHandler):
def refresh(self) -> None:
"""Refresh authentication credentials.
API keys don't typically need refreshing, so this is a no-op.
If the API key becomes invalid, a new one must be provided.
"""
# API keys don't refresh - they're static until manually replaced
pass
@property
def api_key(self) -> str:
"""Get the masked API key for logging/debugging.
Returns:
str: Masked API key showing only first 4 and last 4 characters.
"""
if len(self._api_key) <= 8:
return "*" * len(self._api_key)
return f"{self._api_key[:4]}{'*' * (len(self._api_key) - 8)}{self._api_key[-4:]}"
return (
f"{self._api_key[:4]}{'*' * (len(self._api_key) - 8)}{self._api_key[-4:]}"
)
def __repr__(self) -> str:
"""String representation of the auth handler.
Returns:
str: Safe representation with masked API key.
"""
return f"APIKeyAuth(api_key='{self.api_key}')"
return f"APIKeyAuth(api_key='{self.api_key}')"

View File

@@ -5,12 +5,12 @@ providing a consistent interface for different authentication methods.
"""
from abc import ABC, abstractmethod
from typing import Dict, Optional
from typing import Dict
class AuthHandler(ABC):
"""Abstract base class for Wiki.js authentication handlers.
This class defines the interface that all authentication implementations
must follow, ensuring consistent behavior across different auth methods.
"""
@@ -18,56 +18,54 @@ class AuthHandler(ABC):
@abstractmethod
def get_headers(self) -> Dict[str, str]:
"""Get authentication headers for HTTP requests.
Returns:
Dict[str, str]: Dictionary of headers to include in requests.
Raises:
AuthenticationError: If authentication is invalid or expired.
"""
pass
@abstractmethod
def is_valid(self) -> bool:
"""Check if the current authentication is valid.
Returns:
bool: True if authentication is valid, False otherwise.
"""
pass
@abstractmethod
def refresh(self) -> None:
"""Refresh the authentication if possible.
For token-based authentication, this should refresh the token.
For API key authentication, this is typically a no-op.
Raises:
AuthenticationError: If refresh fails.
"""
pass
def validate_credentials(self) -> None:
"""Validate credentials and refresh if necessary.
This is a convenience method that checks validity and refreshes
if needed. Subclasses can override for custom behavior.
Raises:
AuthenticationError: If credentials are invalid or refresh fails.
"""
if not self.is_valid():
self.refresh()
if not self.is_valid():
from ..exceptions import AuthenticationError
raise AuthenticationError("Authentication credentials are invalid")
class NoAuth(AuthHandler):
"""No-authentication handler for testing or public instances.
This handler provides an empty authentication implementation,
useful for testing or when accessing public Wiki.js instances
that don't require authentication.
@@ -75,7 +73,7 @@ class NoAuth(AuthHandler):
def get_headers(self) -> Dict[str, str]:
"""Return empty headers dict.
Returns:
Dict[str, str]: Empty dictionary.
"""
@@ -83,7 +81,7 @@ class NoAuth(AuthHandler):
def is_valid(self) -> bool:
"""Always return True for no-auth.
Returns:
bool: Always True.
"""
@@ -91,7 +89,6 @@ class NoAuth(AuthHandler):
def refresh(self) -> None:
"""No-op for no-auth.
This method does nothing since there's no authentication to refresh.
"""
pass

View File

@@ -5,7 +5,7 @@ JWT tokens are typically used for user-based authentication and have expiration
"""
import time
from datetime import datetime, timedelta
from datetime import timedelta
from typing import Dict, Optional
from .base import AuthHandler
@@ -13,39 +13,39 @@ from .base import AuthHandler
class JWTAuth(AuthHandler):
"""JWT token authentication handler for Wiki.js.
This handler manages JWT tokens with automatic refresh capabilities.
JWT tokens typically expire and need to be refreshed periodically.
Args:
token: The JWT token string.
refresh_token: Optional refresh token for automatic renewal.
expires_at: Optional expiration timestamp (Unix timestamp).
Example:
>>> auth = JWTAuth("eyJ0eXAiOiJKV1QiLCJhbGc...")
>>> client = WikiJSClient("https://wiki.example.com", auth=auth)
"""
def __init__(
self,
token: str,
self,
token: str,
refresh_token: Optional[str] = None,
expires_at: Optional[float] = None
expires_at: Optional[float] = None,
) -> None:
"""Initialize JWT authentication.
Args:
token: The JWT token string.
refresh_token: Optional refresh token for automatic renewal.
expires_at: Optional expiration timestamp (Unix timestamp).
Raises:
ValueError: If token is empty or None.
"""
if not token or not token.strip():
raise ValueError("JWT token cannot be empty")
self._token = token.strip()
self._refresh_token = refresh_token.strip() if refresh_token else None
self._expires_at = expires_at
@@ -53,66 +53,66 @@ class JWTAuth(AuthHandler):
def get_headers(self) -> Dict[str, str]:
"""Get authentication headers with JWT token.
Automatically attempts to refresh the token if it's expired.
Returns:
Dict[str, str]: Headers containing the Authorization header.
Raises:
AuthenticationError: If token is expired and cannot be refreshed.
"""
# Try to refresh if token is near expiration
if not self.is_valid():
self.refresh()
return {
"Authorization": f"Bearer {self._token}",
"Content-Type": "application/json"
"Content-Type": "application/json",
}
def is_valid(self) -> bool:
"""Check if JWT token is valid and not expired.
Returns:
bool: True if token exists and is not expired, False otherwise.
"""
if not self._token or not self._token.strip():
return False
# If no expiration time is set, assume token is valid
if self._expires_at is None:
return True
# Check if token is expired (with buffer for refresh)
current_time = time.time()
return current_time < (self._expires_at - self._refresh_buffer)
def refresh(self) -> None:
"""Refresh the JWT token using the refresh token.
This method attempts to refresh the JWT token using the refresh token.
If no refresh token is available, it raises an AuthenticationError.
Note: This is a placeholder implementation. In a real implementation,
this would make an HTTP request to the Wiki.js token refresh endpoint.
Raises:
AuthenticationError: If refresh token is not available or refresh fails.
"""
from ..exceptions import AuthenticationError
if not self._refresh_token:
raise AuthenticationError(
"JWT token expired and no refresh token available"
)
# TODO: Implement actual token refresh logic
# This would typically involve:
# 1. Making a POST request to /auth/refresh endpoint
# 2. Sending the refresh token
# 3. Updating self._token and self._expires_at with the response
raise AuthenticationError(
"JWT token refresh not yet implemented. "
"Please provide a new token or use API key authentication."
@@ -120,46 +120,46 @@ class JWTAuth(AuthHandler):
def is_expired(self) -> bool:
"""Check if the JWT token is expired.
Returns:
bool: True if token is expired, False otherwise.
"""
if self._expires_at is None:
return False
return time.time() >= self._expires_at
def time_until_expiry(self) -> Optional[timedelta]:
"""Get time until token expires.
Returns:
Optional[timedelta]: Time until expiration, or None if no expiration set.
"""
if self._expires_at is None:
return None
remaining_seconds = self._expires_at - time.time()
return timedelta(seconds=max(0, remaining_seconds))
@property
def token_preview(self) -> str:
"""Get a preview of the JWT token for logging/debugging.
Returns:
str: Masked token showing only first and last few characters.
"""
if not self._token:
return "None"
if len(self._token) <= 20:
return "*" * len(self._token)
return f"{self._token[:10]}...{self._token[-10:]}"
def __repr__(self) -> str:
"""String representation of the auth handler.
Returns:
str: Safe representation with masked token.
"""
return f"JWTAuth(token='{self.token_preview}', expires_at={self._expires_at})"
return f"JWTAuth(token='{self.token_preview}', expires_at={self._expires_at})"

View File

@@ -7,7 +7,7 @@ import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from .auth import AuthHandler, APIKeyAuth
from .auth import APIKeyAuth, AuthHandler
from .endpoints import PagesEndpoint
from .exceptions import (
APIError,
@@ -17,36 +17,41 @@ from .exceptions import (
TimeoutError,
create_api_error,
)
from .utils import normalize_url, build_api_url, parse_wiki_response, extract_error_message
from .utils import (
build_api_url,
extract_error_message,
normalize_url,
parse_wiki_response,
)
class WikiJSClient:
"""Main client for interacting with Wiki.js API.
This client provides a high-level interface for all Wiki.js API operations
including pages, users, groups, and system management. It handles authentication,
error handling, and response parsing automatically.
Args:
base_url: The base URL of your Wiki.js instance
auth: Authentication (API key string or auth handler)
timeout: Request timeout in seconds (default: 30)
verify_ssl: Whether to verify SSL certificates (default: True)
user_agent: Custom User-Agent header
Example:
Basic usage with API key:
>>> client = WikiJSClient('https://wiki.example.com', auth='your-api-key')
>>> pages = client.pages.list()
>>> page = client.pages.get(123)
Attributes:
base_url: The normalized base URL
timeout: Request timeout setting
verify_ssl: SSL verification setting
"""
def __init__(
self,
base_url: str,
@@ -57,7 +62,7 @@ class WikiJSClient:
):
# Validate and normalize base URL
self.base_url = normalize_url(base_url)
# Store authentication
if isinstance(auth, str):
# Convert string API key to APIKeyAuth handler
@@ -69,77 +74,86 @@ class WikiJSClient:
raise ConfigurationError(
f"Invalid auth parameter: expected str or AuthHandler, got {type(auth)}"
)
# Request configuration
self.timeout = timeout
self.verify_ssl = verify_ssl
self.user_agent = user_agent or f"wikijs-python-sdk/0.1.0"
self.user_agent = user_agent or "wikijs-python-sdk/0.1.0"
# Initialize HTTP session
self._session = self._create_session()
# Endpoint handlers
self.pages = PagesEndpoint(self)
# Future endpoints:
# self.users = UsersEndpoint(self)
# self.groups = GroupsEndpoint(self)
def _create_session(self) -> requests.Session:
"""Create configured HTTP session with retry strategy.
Returns:
Configured requests session
"""
session = requests.Session()
# Configure retry strategy
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "OPTIONS", "POST", "PUT", "DELETE"],
allowed_methods=[
"HEAD",
"GET",
"OPTIONS",
"POST",
"PUT",
"DELETE",
],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
# Set default headers
session.headers.update({
"User-Agent": self.user_agent,
"Accept": "application/json",
"Content-Type": "application/json",
})
session.headers.update(
{
"User-Agent": self.user_agent,
"Accept": "application/json",
"Content-Type": "application/json",
}
)
# Set authentication headers
if self._auth_handler:
# Validate auth and get headers
self._auth_handler.validate_credentials()
auth_headers = self._auth_handler.get_headers()
session.headers.update(auth_headers)
return session
def _request(
self,
method: str,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
json_data: Optional[Dict[str, Any]] = None,
**kwargs
**kwargs,
) -> Dict[str, Any]:
"""Make HTTP request to Wiki.js API.
Args:
method: HTTP method (GET, POST, PUT, DELETE)
endpoint: API endpoint path
params: Query parameters
json_data: JSON data for request body
**kwargs: Additional request parameters
Returns:
Parsed response data
Raises:
AuthenticationError: If authentication fails
APIError: If API returns an error
@@ -148,44 +162,44 @@ class WikiJSClient:
"""
# Build full URL
url = build_api_url(self.base_url, endpoint)
# Prepare request arguments
request_kwargs = {
"timeout": self.timeout,
"verify": self.verify_ssl,
"params": params,
**kwargs
**kwargs,
}
# Add JSON data if provided
if json_data is not None:
request_kwargs["json"] = json_data
try:
# Make request
response = self._session.request(method, url, **request_kwargs)
# Handle response
return self._handle_response(response)
except requests.exceptions.Timeout as e:
raise TimeoutError(f"Request timed out after {self.timeout} seconds") from e
except requests.exceptions.ConnectionError as e:
raise ConnectionError(f"Failed to connect to {self.base_url}") from e
except requests.exceptions.RequestException as e:
raise APIError(f"Request failed: {str(e)}") from e
def _handle_response(self, response: requests.Response) -> Dict[str, Any]:
"""Handle HTTP response and extract data.
Args:
response: HTTP response object
Returns:
Parsed response data
Raises:
AuthenticationError: If authentication fails (401)
APIError: If API returns an error
@@ -193,31 +207,27 @@ class WikiJSClient:
# Handle authentication errors
if response.status_code == 401:
raise AuthenticationError("Authentication failed - check your API key")
# Handle other HTTP errors
if not response.ok:
error_message = extract_error_message(response)
raise create_api_error(
response.status_code,
error_message,
response
)
raise create_api_error(response.status_code, error_message, response)
# Parse JSON response
try:
data = response.json()
except json.JSONDecodeError as e:
raise APIError(f"Invalid JSON response: {str(e)}") from e
# Parse Wiki.js specific response format
return parse_wiki_response(data)
def test_connection(self) -> bool:
"""Test connection to Wiki.js instance.
Returns:
True if connection successful
Raises:
ConfigurationError: If client is not properly configured
ConnectionError: If cannot connect to server
@@ -225,42 +235,42 @@ class WikiJSClient:
"""
if not self.base_url:
raise ConfigurationError("Base URL not configured")
if not self._auth_handler:
raise ConfigurationError("Authentication not configured")
try:
# Try to hit a basic endpoint (will implement with actual endpoints)
# For now, just test basic connectivity
response = self._session.get(
self.base_url,
timeout=self.timeout,
verify=self.verify_ssl
self._session.get(
self.base_url, timeout=self.timeout, verify=self.verify_ssl
)
return True
except requests.exceptions.Timeout:
raise TimeoutError(f"Connection test timed out after {self.timeout} seconds")
raise TimeoutError(
f"Connection test timed out after {self.timeout} seconds"
)
except requests.exceptions.ConnectionError as e:
raise ConnectionError(f"Cannot connect to {self.base_url}: {str(e)}")
except Exception as e:
raise ConnectionError(f"Connection test failed: {str(e)}")
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - close session."""
self.close()
def close(self):
"""Close the HTTP session and clean up resources."""
if self._session:
self._session.close()
def __repr__(self) -> str:
"""String representation of client."""
return f"WikiJSClient(base_url='{self.base_url}')"
return f"WikiJSClient(base_url='{self.base_url}')"

View File

@@ -19,4 +19,4 @@ from .pages import PagesEndpoint
__all__ = [
"BaseEndpoint",
"PagesEndpoint",
]
]

View File

@@ -1,6 +1,6 @@
"""Base endpoint class for wikijs-python-sdk."""
from typing import Any, Dict, List, Optional, TYPE_CHECKING
from typing import TYPE_CHECKING, Any, Dict, Optional
if TYPE_CHECKING:
from ..client import WikiJSClient
@@ -8,39 +8,39 @@ if TYPE_CHECKING:
class BaseEndpoint:
"""Base class for all API endpoints.
This class provides common functionality for making API requests
and handling responses across all endpoint implementations.
Args:
client: The WikiJS client instance
"""
def __init__(self, client: "WikiJSClient"):
"""Initialize endpoint with client reference.
Args:
client: WikiJS client instance
"""
self._client = client
def _request(
self,
method: str,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
json_data: Optional[Dict[str, Any]] = None,
**kwargs
**kwargs,
) -> Dict[str, Any]:
"""Make HTTP request through the client.
Args:
method: HTTP method (GET, POST, PUT, DELETE)
endpoint: API endpoint path
params: Query parameters
json_data: JSON data for request body
**kwargs: Additional request parameters
Returns:
Parsed response data
"""
@@ -49,94 +49,92 @@ class BaseEndpoint:
endpoint=endpoint,
params=params,
json_data=json_data,
**kwargs
**kwargs,
)
def _get(
self,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
**kwargs
self, endpoint: str, params: Optional[Dict[str, Any]] = None, **kwargs
) -> Dict[str, Any]:
"""Make GET request.
Args:
endpoint: API endpoint path
params: Query parameters
**kwargs: Additional request parameters
Returns:
Parsed response data
"""
return self._request("GET", endpoint, params=params, **kwargs)
def _post(
self,
endpoint: str,
json_data: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None,
**kwargs
**kwargs,
) -> Dict[str, Any]:
"""Make POST request.
Args:
endpoint: API endpoint path
json_data: JSON data for request body
params: Query parameters
**kwargs: Additional request parameters
Returns:
Parsed response data
"""
return self._request("POST", endpoint, params=params, json_data=json_data, **kwargs)
return self._request(
"POST", endpoint, params=params, json_data=json_data, **kwargs
)
def _put(
self,
endpoint: str,
json_data: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None,
**kwargs
**kwargs,
) -> Dict[str, Any]:
"""Make PUT request.
Args:
endpoint: API endpoint path
json_data: JSON data for request body
params: Query parameters
**kwargs: Additional request parameters
Returns:
Parsed response data
"""
return self._request("PUT", endpoint, params=params, json_data=json_data, **kwargs)
return self._request(
"PUT", endpoint, params=params, json_data=json_data, **kwargs
)
def _delete(
self,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
**kwargs
self, endpoint: str, params: Optional[Dict[str, Any]] = None, **kwargs
) -> Dict[str, Any]:
"""Make DELETE request.
Args:
endpoint: API endpoint path
params: Query parameters
**kwargs: Additional request parameters
Returns:
Parsed response data
"""
return self._request("DELETE", endpoint, params=params, **kwargs)
def _build_endpoint(self, *parts: str) -> str:
"""Build endpoint path from parts.
Args:
*parts: Path components
Returns:
Formatted endpoint path
"""
# Remove empty parts and join with /
clean_parts = [str(part).strip("/") for part in parts if part]
return "/" + "/".join(clean_parts)
return "/" + "/".join(clean_parts)

View File

@@ -9,20 +9,20 @@ from .base import BaseEndpoint
class PagesEndpoint(BaseEndpoint):
"""Endpoint for Wiki.js Pages API operations.
This endpoint provides methods for creating, reading, updating, and deleting
wiki pages through the Wiki.js GraphQL API.
Example:
>>> client = WikiJSClient('https://wiki.example.com', auth='api-key')
>>> pages = client.pages
>>>
>>>
>>> # List all pages
>>> all_pages = pages.list()
>>>
>>>
>>> # Get a specific page
>>> page = pages.get(123)
>>>
>>>
>>> # Create a new page
>>> new_page_data = PageCreate(
... title="Getting Started",
@@ -30,15 +30,15 @@ class PagesEndpoint(BaseEndpoint):
... content="# Welcome\\n\\nThis is your first page!"
... )
>>> created_page = pages.create(new_page_data)
>>>
>>>
>>> # Update an existing page
>>> update_data = PageUpdate(title="Updated Title")
>>> updated_page = pages.update(123, update_data)
>>>
>>>
>>> # Delete a page
>>> pages.delete(123)
"""
def list(
self,
limit: Optional[int] = None,
@@ -48,10 +48,10 @@ class PagesEndpoint(BaseEndpoint):
locale: Optional[str] = None,
author_id: Optional[int] = None,
order_by: str = "title",
order_direction: str = "ASC"
order_direction: str = "ASC",
) -> List[Page]:
"""List pages with optional filtering.
Args:
limit: Maximum number of pages to return
offset: Number of pages to skip
@@ -61,10 +61,10 @@ class PagesEndpoint(BaseEndpoint):
author_id: Author ID to filter by
order_by: Field to order by (title, created_at, updated_at)
order_direction: Order direction (ASC or DESC)
Returns:
List of Page objects
Raises:
APIError: If the API request fails
ValidationError: If parameters are invalid
@@ -72,16 +72,18 @@ class PagesEndpoint(BaseEndpoint):
# Validate parameters
if limit is not None and limit < 1:
raise ValidationError("limit must be greater than 0")
if offset is not None and offset < 0:
raise ValidationError("offset must be non-negative")
if order_by not in ["title", "created_at", "updated_at", "path"]:
raise ValidationError("order_by must be one of: title, created_at, updated_at, path")
raise ValidationError(
"order_by must be one of: title, created_at, updated_at, path"
)
if order_direction not in ["ASC", "DESC"]:
raise ValidationError("order_direction must be ASC or DESC")
# Build GraphQL query using actual Wiki.js schema
query = """
query {
@@ -97,18 +99,16 @@ class PagesEndpoint(BaseEndpoint):
}
}
"""
# Make request (no variables needed for simple list query)
response = self._post("/graphql", json_data={
"query": query
})
response = self._post("/graphql", json_data={"query": query})
# Parse response
if "errors" in response:
raise APIError(f"GraphQL errors: {response['errors']}")
pages_data = response.get("data", {}).get("pages", {}).get("list", [])
# Convert to Page objects
pages = []
for page_data in pages_data:
@@ -119,25 +119,25 @@ class PagesEndpoint(BaseEndpoint):
pages.append(page)
except Exception as e:
raise APIError(f"Failed to parse page data: {str(e)}") from e
return pages
def get(self, page_id: int) -> Page:
"""Get a specific page by ID.
Args:
page_id: The page ID
Returns:
Page object
Raises:
APIError: If the page is not found or request fails
ValidationError: If page_id is invalid
"""
if not isinstance(page_id, int) or page_id < 1:
raise ValidationError("page_id must be a positive integer")
# Build GraphQL query using actual Wiki.js schema
query = """
query($id: Int!) {
@@ -164,48 +164,48 @@ class PagesEndpoint(BaseEndpoint):
}
}
"""
# Make request
response = self._post("/graphql", json_data={
"query": query,
"variables": {"id": page_id}
})
response = self._post(
"/graphql",
json_data={"query": query, "variables": {"id": page_id}},
)
# Parse response
if "errors" in response:
raise APIError(f"GraphQL errors: {response['errors']}")
page_data = response.get("data", {}).get("pages", {}).get("single")
if not page_data:
raise APIError(f"Page with ID {page_id} not found")
# Convert to Page object
try:
normalized_data = self._normalize_page_data(page_data)
return Page(**normalized_data)
except Exception as e:
raise APIError(f"Failed to parse page data: {str(e)}") from e
def get_by_path(self, path: str, locale: str = "en") -> Page:
"""Get a page by its path.
Args:
path: The page path (e.g., "getting-started")
locale: The page locale (default: "en")
Returns:
Page object
Raises:
APIError: If the page is not found or request fails
ValidationError: If path is invalid
"""
if not path or not isinstance(path, str):
raise ValidationError("path must be a non-empty string")
# Normalize path
path = path.strip("/")
# Build GraphQL query
query = """
query($path: String!, $locale: String!) {
@@ -228,37 +228,40 @@ class PagesEndpoint(BaseEndpoint):
}
}
"""
# Make request
response = self._post("/graphql", json_data={
"query": query,
"variables": {"path": path, "locale": locale}
})
response = self._post(
"/graphql",
json_data={
"query": query,
"variables": {"path": path, "locale": locale},
},
)
# Parse response
if "errors" in response:
raise APIError(f"GraphQL errors: {response['errors']}")
page_data = response.get("data", {}).get("pageByPath")
if not page_data:
raise APIError(f"Page with path '{path}' not found")
# Convert to Page object
try:
normalized_data = self._normalize_page_data(page_data)
return Page(**normalized_data)
except Exception as e:
raise APIError(f"Failed to parse page data: {str(e)}") from e
def create(self, page_data: Union[PageCreate, Dict[str, Any]]) -> Page:
"""Create a new page.
Args:
page_data: Page creation data (PageCreate object or dict)
Returns:
Created Page object
Raises:
APIError: If page creation fails
ValidationError: If page data is invalid
@@ -271,7 +274,7 @@ class PagesEndpoint(BaseEndpoint):
raise ValidationError(f"Invalid page data: {str(e)}") from e
elif not isinstance(page_data, PageCreate):
raise ValidationError("page_data must be PageCreate object or dict")
# Build GraphQL mutation using actual Wiki.js schema
mutation = """
mutation($content: String!, $description: String!, $editor: String!, $isPublished: Boolean!, $isPrivate: Boolean!, $locale: String!, $path: String!, $tags: [String]!, $title: String!) {
@@ -306,69 +309,67 @@ class PagesEndpoint(BaseEndpoint):
}
}
"""
# Build variables from page data
variables = {
"title": page_data.title,
"path": page_data.path,
"content": page_data.content,
"description": page_data.description or f"Created via SDK: {page_data.title}",
"description": page_data.description
or f"Created via SDK: {page_data.title}",
"isPublished": page_data.is_published,
"isPrivate": page_data.is_private,
"tags": page_data.tags,
"locale": page_data.locale,
"editor": page_data.editor
"editor": page_data.editor,
}
# Make request
response = self._post("/graphql", json_data={
"query": mutation,
"variables": variables
})
response = self._post(
"/graphql", json_data={"query": mutation, "variables": variables}
)
# Parse response
if "errors" in response:
raise APIError(f"Failed to create page: {response['errors']}")
create_result = response.get("data", {}).get("pages", {}).get("create", {})
response_result = create_result.get("responseResult", {})
if not response_result.get("succeeded"):
error_msg = response_result.get("message", "Unknown error")
raise APIError(f"Page creation failed: {error_msg}")
created_page_data = create_result.get("page")
if not created_page_data:
raise APIError("Page creation failed - no page data returned")
# Convert to Page object
try:
normalized_data = self._normalize_page_data(created_page_data)
return Page(**normalized_data)
except Exception as e:
raise APIError(f"Failed to parse created page data: {str(e)}") from e
def update(
self,
page_id: int,
page_data: Union[PageUpdate, Dict[str, Any]]
self, page_id: int, page_data: Union[PageUpdate, Dict[str, Any]]
) -> Page:
"""Update an existing page.
Args:
page_id: The page ID
page_data: Page update data (PageUpdate object or dict)
Returns:
Updated Page object
Raises:
APIError: If page update fails
ValidationError: If parameters are invalid
"""
if not isinstance(page_id, int) or page_id < 1:
raise ValidationError("page_id must be a positive integer")
# Convert to PageUpdate if needed
if isinstance(page_data, dict):
try:
@@ -377,7 +378,7 @@ class PagesEndpoint(BaseEndpoint):
raise ValidationError(f"Invalid page data: {str(e)}") from e
elif not isinstance(page_data, PageUpdate):
raise ValidationError("page_data must be PageUpdate object or dict")
# Build GraphQL mutation
mutation = """
mutation($id: Int!, $title: String, $content: String, $description: String, $isPublished: Boolean, $isPrivate: Boolean, $tags: [String]) {
@@ -408,10 +409,10 @@ class PagesEndpoint(BaseEndpoint):
}
}
"""
# Build variables (only include non-None values)
variables = {"id": page_id}
if page_data.title is not None:
variables["title"] = page_data.title
if page_data.content is not None:
@@ -424,44 +425,43 @@ class PagesEndpoint(BaseEndpoint):
variables["isPrivate"] = page_data.is_private
if page_data.tags is not None:
variables["tags"] = page_data.tags
# Make request
response = self._post("/graphql", json_data={
"query": mutation,
"variables": variables
})
response = self._post(
"/graphql", json_data={"query": mutation, "variables": variables}
)
# Parse response
if "errors" in response:
raise APIError(f"Failed to update page: {response['errors']}")
updated_page_data = response.get("data", {}).get("updatePage")
if not updated_page_data:
raise APIError("Page update failed - no data returned")
# Convert to Page object
try:
normalized_data = self._normalize_page_data(updated_page_data)
return Page(**normalized_data)
except Exception as e:
raise APIError(f"Failed to parse updated page data: {str(e)}") from e
def delete(self, page_id: int) -> bool:
"""Delete a page.
Args:
page_id: The page ID
Returns:
True if deletion was successful
Raises:
APIError: If page deletion fails
ValidationError: If page_id is invalid
"""
if not isinstance(page_id, int) or page_id < 1:
raise ValidationError("page_id must be a positive integer")
# Build GraphQL mutation
mutation = """
mutation($id: Int!) {
@@ -471,118 +471,116 @@ class PagesEndpoint(BaseEndpoint):
}
}
"""
# Make request
response = self._post("/graphql", json_data={
"query": mutation,
"variables": {"id": page_id}
})
response = self._post(
"/graphql",
json_data={"query": mutation, "variables": {"id": page_id}},
)
# Parse response
if "errors" in response:
raise APIError(f"Failed to delete page: {response['errors']}")
delete_result = response.get("data", {}).get("deletePage", {})
success = delete_result.get("success", False)
if not success:
message = delete_result.get("message", "Unknown error")
raise APIError(f"Page deletion failed: {message}")
return True
def search(
self,
query: str,
limit: Optional[int] = None,
locale: Optional[str] = None
locale: Optional[str] = None,
) -> List[Page]:
"""Search for pages by content and title.
Args:
query: Search query string
limit: Maximum number of results to return
locale: Locale to search in
Returns:
List of matching Page objects
Raises:
APIError: If search fails
ValidationError: If parameters are invalid
"""
if not query or not isinstance(query, str):
raise ValidationError("query must be a non-empty string")
if limit is not None and limit < 1:
raise ValidationError("limit must be greater than 0")
# Use the list method with search parameter
return self.list(
search=query,
limit=limit,
locale=locale
)
return self.list(search=query, limit=limit, locale=locale)
def get_by_tags(
self,
tags: List[str],
match_all: bool = True,
limit: Optional[int] = None
limit: Optional[int] = None,
) -> List[Page]:
"""Get pages by tags.
Args:
tags: List of tags to search for
match_all: If True, pages must have ALL tags. If False, ANY tag matches
limit: Maximum number of results to return
Returns:
List of matching Page objects
Raises:
APIError: If request fails
ValidationError: If parameters are invalid
"""
if not tags or not isinstance(tags, list):
raise ValidationError("tags must be a non-empty list")
if limit is not None and limit < 1:
raise ValidationError("limit must be greater than 0")
# For match_all=True, use the tags parameter directly
if match_all:
return self.list(tags=tags, limit=limit)
# For match_all=False, we need a more complex query
# This would require a custom GraphQL query or multiple requests
# For now, implement a simple approach
all_pages = self.list(limit=limit * 2 if limit else None) # Get more pages to filter
all_pages = self.list(
limit=limit * 2 if limit else None
) # Get more pages to filter
matching_pages = []
for page in all_pages:
if any(tag.lower() in [t.lower() for t in page.tags] for tag in tags):
matching_pages.append(page)
if limit and len(matching_pages) >= limit:
break
return matching_pages
def _normalize_page_data(self, page_data: Dict[str, Any]) -> Dict[str, Any]:
"""Normalize page data from API response to model format.
Args:
page_data: Raw page data from API
Returns:
Normalized data for Page model
"""
normalized = {}
# Map API field names to model field names
field_mapping = {
"id": "id",
"title": "title",
"title": "title",
"path": "path",
"content": "content",
"description": "description",
@@ -590,17 +588,17 @@ class PagesEndpoint(BaseEndpoint):
"isPrivate": "is_private",
"locale": "locale",
"authorId": "author_id",
"authorName": "author_name",
"authorName": "author_name",
"authorEmail": "author_email",
"editor": "editor",
"createdAt": "created_at",
"updatedAt": "updated_at"
"updatedAt": "updated_at",
}
for api_field, model_field in field_mapping.items():
if api_field in page_data:
normalized[model_field] = page_data[api_field]
# Handle tags - convert from Wiki.js format
if "tags" in page_data:
if isinstance(page_data["tags"], list):
@@ -616,5 +614,5 @@ class PagesEndpoint(BaseEndpoint):
normalized["tags"] = []
else:
normalized["tags"] = []
return normalized
return normalized

View File

@@ -5,7 +5,7 @@ from typing import Any, Dict, Optional
class WikiJSException(Exception):
"""Base exception for all SDK errors."""
def __init__(self, message: str, details: Optional[Dict[str, Any]] = None):
super().__init__(message)
self.message = message
@@ -14,17 +14,15 @@ class WikiJSException(Exception):
class ConfigurationError(WikiJSException):
"""Raised when there's an issue with SDK configuration."""
pass
class AuthenticationError(WikiJSException):
"""Raised when authentication fails."""
pass
class ValidationError(WikiJSException):
"""Raised when input validation fails."""
def __init__(self, message: str, field: Optional[str] = None, value: Any = None):
super().__init__(message)
self.field = field
@@ -33,13 +31,13 @@ class ValidationError(WikiJSException):
class APIError(WikiJSException):
"""Base class for API-related errors."""
def __init__(
self,
message: str,
status_code: Optional[int] = None,
self,
message: str,
status_code: Optional[int] = None,
response: Optional[Any] = None,
details: Optional[Dict[str, Any]] = None
details: Optional[Dict[str, Any]] = None,
):
super().__init__(message, details)
self.status_code = status_code
@@ -48,52 +46,46 @@ class APIError(WikiJSException):
class ClientError(APIError):
"""Raised for 4xx HTTP status codes (client errors)."""
pass
class ServerError(APIError):
"""Raised for 5xx HTTP status codes (server errors)."""
pass
class NotFoundError(ClientError):
"""Raised when a requested resource is not found (404)."""
pass
class PermissionError(ClientError):
"""Raised when access is forbidden (403)."""
pass
class RateLimitError(ClientError):
"""Raised when rate limit is exceeded (429)."""
def __init__(self, message: str, retry_after: Optional[int] = None, **kwargs):
# Remove status_code from kwargs if present to avoid duplicate argument
kwargs.pop('status_code', None)
kwargs.pop("status_code", None)
super().__init__(message, status_code=429, **kwargs)
self.retry_after = retry_after
class ConnectionError(WikiJSException):
"""Raised when there's a connection issue."""
pass
class TimeoutError(WikiJSException):
"""Raised when a request times out."""
pass
def create_api_error(status_code: int, message: str, response: Any = None) -> APIError:
"""Create appropriate API error based on status code.
Args:
status_code: HTTP status code
message: Error message
response: Raw response object
Returns:
Appropriate APIError subclass instance
"""
@@ -108,4 +100,4 @@ def create_api_error(status_code: int, message: str, response: Any = None) -> AP
elif 500 <= status_code < 600:
return ServerError(message, status_code=status_code, response=response)
else:
return APIError(message, status_code=status_code, response=response)
return APIError(message, status_code=status_code, response=response)

View File

@@ -6,6 +6,6 @@ from .page import Page, PageCreate, PageUpdate
__all__ = [
"BaseModel",
"Page",
"PageCreate",
"PageCreate",
"PageUpdate",
]
]

View File

@@ -3,19 +3,20 @@
from datetime import datetime
from typing import Any, Dict, Optional
from pydantic import BaseModel as PydanticBaseModel, ConfigDict
from pydantic import BaseModel as PydanticBaseModel
from pydantic import ConfigDict
class BaseModel(PydanticBaseModel):
"""Base model with common functionality for all data models.
Provides:
- Automatic validation via Pydantic
- JSON serialization/deserialization
- Field aliases for API compatibility
- Consistent datetime handling
"""
model_config = ConfigDict(
# Allow population by field name or alias
populate_by_name=True,
@@ -26,52 +27,50 @@ class BaseModel(PydanticBaseModel):
# Allow extra fields for forward compatibility
extra="ignore",
# Serialize datetime as ISO format
json_encoders={
datetime: lambda v: v.isoformat() if v else None
}
json_encoders={datetime: lambda v: v.isoformat() if v else None},
)
def to_dict(self, exclude_none: bool = True) -> Dict[str, Any]:
"""Convert model to dictionary.
Args:
exclude_none: Whether to exclude None values
Returns:
Dictionary representation of the model
"""
return self.model_dump(exclude_none=exclude_none, by_alias=True)
def to_json(self, exclude_none: bool = True) -> str:
"""Convert model to JSON string.
Args:
exclude_none: Whether to exclude None values
Returns:
JSON string representation of the model
"""
return self.model_dump_json(exclude_none=exclude_none, by_alias=True)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "BaseModel":
"""Create model instance from dictionary.
Args:
data: Dictionary data
Returns:
Model instance
"""
return cls(**data)
@classmethod
def from_json(cls, json_str: str) -> "BaseModel":
"""Create model instance from JSON string.
Args:
json_str: JSON string
Returns:
Model instance
"""
@@ -80,11 +79,11 @@ class BaseModel(PydanticBaseModel):
class TimestampedModel(BaseModel):
"""Base model with timestamp fields."""
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
@property
def is_new(self) -> bool:
"""Check if this is a new (unsaved) model."""
return self.created_at is None
return self.created_at is None

View File

@@ -1,7 +1,6 @@
"""Page-related data models for wikijs-python-sdk."""
import re
from datetime import datetime
from typing import List, Optional
from pydantic import Field, validator
@@ -11,89 +10,89 @@ from .base import BaseModel, TimestampedModel
class Page(TimestampedModel):
"""Represents a Wiki.js page.
This model contains all the data for a wiki page including
content, metadata, and computed properties.
"""
id: int = Field(..., description="Unique page identifier")
title: str = Field(..., description="Page title")
path: str = Field(..., description="Page path/slug")
content: Optional[str] = Field(None, description="Page content")
# Optional fields that may be present
description: Optional[str] = Field(None, description="Page description")
is_published: bool = Field(True, description="Whether page is published")
is_private: bool = Field(False, description="Whether page is private")
# Metadata
tags: List[str] = Field(default_factory=list, description="Page tags")
locale: str = Field("en", description="Page locale")
# Author information
author_id: Optional[int] = Field(None, alias="authorId")
author_name: Optional[str] = Field(None, alias="authorName")
author_email: Optional[str] = Field(None, alias="authorEmail")
# Editor information
# Editor information
editor: Optional[str] = Field(None, description="Editor used")
@validator("path")
def validate_path(cls, v):
"""Validate page path format."""
if not v:
raise ValueError("Path cannot be empty")
# Remove leading/trailing slashes and normalize
v = v.strip("/")
# Check for valid characters (letters, numbers, hyphens, underscores, slashes)
if not re.match(r"^[a-zA-Z0-9\-_/]+$", v):
raise ValueError("Path contains invalid characters")
return v
@validator("title")
def validate_title(cls, v):
"""Validate page title."""
if not v or not v.strip():
raise ValueError("Title cannot be empty")
# Limit title length
if len(v) > 255:
raise ValueError("Title cannot exceed 255 characters")
return v.strip()
@property
def word_count(self) -> int:
"""Calculate word count from content."""
if not self.content:
return 0
# Simple word count - split on whitespace
words = self.content.split()
return len(words)
@property
def reading_time(self) -> int:
"""Estimate reading time in minutes (assuming 200 words per minute)."""
return max(1, self.word_count // 200)
@property
def url_path(self) -> str:
"""Get the full URL path for this page."""
return f"/{self.path}"
def extract_headings(self) -> List[str]:
"""Extract markdown headings from content.
Returns:
List of heading text (without # markers)
"""
if not self.content:
return []
headings = []
for line in self.content.split("\n"):
line = line.strip()
@@ -102,15 +101,15 @@ class Page(TimestampedModel):
heading = re.sub(r"^#+\s*", "", line).strip()
if heading:
headings.append(heading)
return headings
def has_tag(self, tag: str) -> bool:
"""Check if page has a specific tag.
Args:
tag: Tag to check for
Returns:
True if page has the tag
"""
@@ -119,70 +118,70 @@ class Page(TimestampedModel):
class PageCreate(BaseModel):
"""Data model for creating a new page."""
title: str = Field(..., description="Page title")
path: str = Field(..., description="Page path/slug")
content: str = Field(..., description="Page content")
# Optional fields
description: Optional[str] = Field(None, description="Page description")
is_published: bool = Field(True, description="Whether to publish immediately")
is_private: bool = Field(False, description="Whether page should be private")
tags: List[str] = Field(default_factory=list, description="Page tags")
locale: str = Field("en", description="Page locale")
editor: str = Field("markdown", description="Editor to use")
@validator("path")
def validate_path(cls, v):
"""Validate page path format."""
if not v:
raise ValueError("Path cannot be empty")
# Remove leading/trailing slashes and normalize
v = v.strip("/")
# Check for valid characters
if not re.match(r"^[a-zA-Z0-9\-_/]+$", v):
raise ValueError("Path contains invalid characters")
return v
@validator("title")
def validate_title(cls, v):
"""Validate page title."""
if not v or not v.strip():
raise ValueError("Title cannot be empty")
if len(v) > 255:
raise ValueError("Title cannot exceed 255 characters")
return v.strip()
class PageUpdate(BaseModel):
"""Data model for updating an existing page."""
# All fields optional for partial updates
title: Optional[str] = Field(None, description="Page title")
content: Optional[str] = Field(None, description="Page content")
description: Optional[str] = Field(None, description="Page description")
is_published: Optional[bool] = Field(None, description="Publication status")
is_private: Optional[bool] = Field(None, description="Privacy status")
tags: Optional[List[str]] = Field(None, description="Page tags")
@validator("title")
def validate_title(cls, v):
"""Validate page title if provided."""
if v is not None:
if not v.strip():
raise ValueError("Title cannot be empty")
if len(v) > 255:
raise ValueError("Title cannot exceed 255 characters")
return v.strip()
return v
return v

View File

@@ -1,23 +1,23 @@
"""Utility functions for wikijs-python-sdk."""
from .helpers import (
build_api_url,
chunk_list,
extract_error_message,
normalize_url,
parse_wiki_response,
safe_get,
sanitize_path,
validate_url,
build_api_url,
parse_wiki_response,
extract_error_message,
chunk_list,
safe_get,
)
__all__ = [
"normalize_url",
"sanitize_path",
"sanitize_path",
"validate_url",
"build_api_url",
"parse_wiki_response",
"extract_error_message",
"chunk_list",
"safe_get",
]
]

View File

@@ -1,7 +1,7 @@
"""Helper utilities for wikijs-python-sdk."""
import re
from typing import Any, Dict, Optional
from typing import Any, Dict
from urllib.parse import urljoin, urlparse
from ..exceptions import APIError, ValidationError
@@ -9,37 +9,37 @@ from ..exceptions import APIError, ValidationError
def normalize_url(base_url: str) -> str:
"""Normalize a base URL for API usage.
Args:
base_url: Base URL to normalize
Returns:
Normalized URL without trailing slash
Raises:
ValidationError: If URL is invalid
"""
if not base_url:
raise ValidationError("Base URL cannot be empty")
# Add https:// if no scheme provided
if not base_url.startswith(("http://", "https://")):
base_url = f"https://{base_url}"
# Validate URL format
if not validate_url(base_url):
raise ValidationError(f"Invalid URL format: {base_url}")
# Remove trailing slash
return base_url.rstrip("/")
def validate_url(url: str) -> bool:
"""Validate URL format.
Args:
url: URL to validate
Returns:
True if URL is valid
"""
@@ -52,72 +52,72 @@ def validate_url(url: str) -> bool:
def sanitize_path(path: str) -> str:
"""Sanitize a wiki page path.
Args:
path: Path to sanitize
Returns:
Sanitized path
Raises:
ValidationError: If path is invalid
"""
if not path:
raise ValidationError("Path cannot be empty")
# Remove leading/trailing slashes and whitespace
path = path.strip().strip("/")
# Replace spaces with hyphens
path = re.sub(r"\s+", "-", path)
# Remove invalid characters, keep only alphanumeric, hyphens, underscores, slashes
path = re.sub(r"[^a-zA-Z0-9\-_/]", "", path)
# Remove multiple consecutive hyphens or slashes
path = re.sub(r"[-/]+", lambda m: m.group(0)[0], path)
if not path:
raise ValidationError("Path contains no valid characters")
return path
def build_api_url(base_url: str, endpoint: str) -> str:
"""Build full API URL from base URL and endpoint.
Args:
base_url: Base URL (already normalized)
endpoint: API endpoint path
Returns:
Full API URL
"""
# Ensure endpoint starts with /
if not endpoint.startswith("/"):
endpoint = f"/{endpoint}"
# Wiki.js API is typically at /graphql, but we'll use REST-style for now
api_base = f"{base_url}/api"
return urljoin(api_base, endpoint.lstrip("/"))
def parse_wiki_response(response_data: Dict[str, Any]) -> Dict[str, Any]:
"""Parse Wiki.js API response data.
Args:
response_data: Raw response data from API
Returns:
Parsed response data
Raises:
APIError: If response indicates an error
"""
if not isinstance(response_data, dict):
return response_data
# Check for error indicators
if "error" in response_data:
error_info = response_data["error"]
@@ -127,26 +127,30 @@ def parse_wiki_response(response_data: Dict[str, Any]) -> Dict[str, Any]:
else:
message = str(error_info)
code = None
raise APIError(f"API Error: {message}", details={"code": code})
# Handle GraphQL-style errors
if "errors" in response_data:
errors = response_data["errors"]
if errors:
first_error = errors[0] if isinstance(errors, list) else errors
message = first_error.get("message", "GraphQL error") if isinstance(first_error, dict) else str(first_error)
message = (
first_error.get("message", "GraphQL error")
if isinstance(first_error, dict)
else str(first_error)
)
raise APIError(f"GraphQL Error: {message}", details={"errors": errors})
return response_data
def extract_error_message(response: Any) -> str:
"""Extract error message from response.
Args:
Args:
response: Response object or data
Returns:
Error message string
"""
@@ -160,50 +164,52 @@ def extract_error_message(response: Any) -> str:
return str(data[field])
except Exception:
pass
if hasattr(response, "text"):
return response.text[:200] + "..." if len(response.text) > 200 else response.text
return (
response.text[:200] + "..." if len(response.text) > 200 else response.text
)
return str(response)
def chunk_list(items: list, chunk_size: int) -> list:
"""Split list into chunks of specified size.
Args:
items: List to chunk
chunk_size: Size of each chunk
Returns:
List of chunks
"""
if chunk_size <= 0:
raise ValueError("Chunk size must be positive")
return [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]
return [items[i : i + chunk_size] for i in range(0, len(items), chunk_size)]
def safe_get(data: Dict[str, Any], key: str, default: Any = None) -> Any:
"""Safely get value from dictionary with dot notation support.
Args:
data: Dictionary to get value from
key: Key (supports dot notation like "user.name")
default: Default value if key not found
Returns:
Value or default
"""
if "." not in key:
return data.get(key, default)
keys = key.split(".")
current = data
for k in keys:
if isinstance(current, dict) and k in current:
current = current[k]
else:
return default
return current
return current

View File

@@ -4,4 +4,4 @@ __version__ = "0.1.0"
__version_info__ = (0, 1, 0)
# Version history
# 0.1.0 - MVP Release: Basic Wiki.js integration with Pages API
# 0.1.0 - MVP Release: Basic Wiki.js integration with Pages API