Merge pull request #1 from l3ocho/claude/review-repository-011CUNdp1kRuTe47fPVBhrBZ

Fix 3 critical issues identified in repository review
This commit is contained in:
Leo Miranda
2025-10-22 13:47:24 -04:00
committed by GitHub
6 changed files with 197 additions and 94 deletions

View File

@@ -13,42 +13,44 @@ from wikijs.exceptions import AuthenticationError
class TestJWTAuth: class TestJWTAuth:
"""Test JWTAuth implementation.""" """Test JWTAuth implementation."""
def test_init_with_valid_token(self, mock_jwt_token): def test_init_with_valid_token(self, mock_jwt_token, mock_wiki_base_url):
"""Test initialization with valid JWT token.""" """Test initialization with valid JWT token."""
auth = JWTAuth(mock_jwt_token) auth = JWTAuth(mock_jwt_token, mock_wiki_base_url)
assert auth._token == mock_jwt_token assert auth._token == mock_jwt_token
assert auth._base_url == mock_wiki_base_url
assert auth._refresh_token is None assert auth._refresh_token is None
assert auth._expires_at is None assert auth._expires_at is None
def test_init_with_all_parameters(self, mock_jwt_token): def test_init_with_all_parameters(self, mock_jwt_token, mock_wiki_base_url):
"""Test initialization with all parameters.""" """Test initialization with all parameters."""
refresh_token = "refresh-token-123" refresh_token = "refresh-token-123"
expires_at = time.time() + 3600 expires_at = time.time() + 3600
auth = JWTAuth(mock_jwt_token, refresh_token, expires_at) auth = JWTAuth(mock_jwt_token, mock_wiki_base_url, refresh_token, expires_at)
assert auth._token == mock_jwt_token assert auth._token == mock_jwt_token
assert auth._base_url == mock_wiki_base_url
assert auth._refresh_token == refresh_token assert auth._refresh_token == refresh_token
assert auth._expires_at == expires_at assert auth._expires_at == expires_at
def test_init_with_whitespace_token(self): def test_init_with_whitespace_token(self, mock_wiki_base_url):
"""Test initialization trims whitespace from token.""" """Test initialization trims whitespace from token."""
auth = JWTAuth(" test-token ") auth = JWTAuth(" test-token ", mock_wiki_base_url)
assert auth._token == "test-token" assert auth._token == "test-token"
def test_init_with_empty_token_raises_error(self): def test_init_with_empty_token_raises_error(self, mock_wiki_base_url):
"""Test that empty JWT token raises ValueError.""" """Test that empty JWT token raises ValueError."""
with pytest.raises(ValueError, match="JWT token cannot be empty"): with pytest.raises(ValueError, match="JWT token cannot be empty"):
JWTAuth("") JWTAuth("", mock_wiki_base_url)
def test_init_with_whitespace_only_token_raises_error(self): def test_init_with_whitespace_only_token_raises_error(self, mock_wiki_base_url):
"""Test that whitespace-only JWT token raises ValueError.""" """Test that whitespace-only JWT token raises ValueError."""
with pytest.raises(ValueError, match="JWT token cannot be empty"): with pytest.raises(ValueError, match="JWT token cannot be empty"):
JWTAuth(" ") JWTAuth(" ", mock_wiki_base_url)
def test_init_with_none_raises_error(self): def test_init_with_none_raises_error(self, mock_wiki_base_url):
"""Test that None JWT token raises ValueError.""" """Test that None JWT token raises ValueError."""
with pytest.raises(ValueError, match="JWT token cannot be empty"): with pytest.raises(ValueError, match="JWT token cannot be empty"):
JWTAuth(None) JWTAuth(None, mock_wiki_base_url)
def test_get_headers_returns_bearer_token(self, jwt_auth, mock_jwt_token): def test_get_headers_returns_bearer_token(self, jwt_auth, mock_jwt_token):
"""Test that get_headers returns proper Authorization header.""" """Test that get_headers returns proper Authorization header."""
@@ -60,17 +62,17 @@ class TestJWTAuth:
} }
assert headers == expected_headers assert headers == expected_headers
def test_get_headers_attempts_refresh_if_invalid(self, mock_jwt_token): def test_get_headers_attempts_refresh_if_invalid(self, mock_jwt_token, mock_wiki_base_url):
"""Test that get_headers attempts refresh if token is invalid.""" """Test that get_headers attempts refresh if token is invalid."""
# Create JWT with expired token # Create JWT with expired token
expires_at = time.time() - 3600 # Expired 1 hour ago expires_at = time.time() - 3600 # Expired 1 hour ago
refresh_token = "refresh-token-123" refresh_token = "refresh-token-123"
auth = JWTAuth(mock_jwt_token, refresh_token, expires_at) auth = JWTAuth(mock_jwt_token, mock_wiki_base_url, refresh_token, expires_at)
# Mock the refresh method to avoid actual implementation # Mock the refresh method to avoid actual implementation
with patch.object(auth, "refresh") as mock_refresh: with patch.object(auth, "refresh") as mock_refresh:
mock_refresh.side_effect = AuthenticationError("Refresh not implemented") mock_refresh.side_effect = AuthenticationError("Refresh failed")
with pytest.raises(AuthenticationError): with pytest.raises(AuthenticationError):
auth.get_headers() auth.get_headers()
@@ -81,28 +83,28 @@ class TestJWTAuth:
"""Test that is_valid returns True for valid token without expiry.""" """Test that is_valid returns True for valid token without expiry."""
assert jwt_auth.is_valid() is True assert jwt_auth.is_valid() is True
def test_is_valid_returns_true_for_non_expired_token(self, mock_jwt_token): def test_is_valid_returns_true_for_non_expired_token(self, mock_jwt_token, mock_wiki_base_url):
"""Test that is_valid returns True for non-expired token.""" """Test that is_valid returns True for non-expired token."""
expires_at = time.time() + 3600 # Expires in 1 hour expires_at = time.time() + 3600 # Expires in 1 hour
auth = JWTAuth(mock_jwt_token, expires_at=expires_at) auth = JWTAuth(mock_jwt_token, mock_wiki_base_url, expires_at=expires_at)
assert auth.is_valid() is True assert auth.is_valid() is True
def test_is_valid_returns_false_for_expired_token(self, mock_jwt_token): def test_is_valid_returns_false_for_expired_token(self, mock_jwt_token, mock_wiki_base_url):
"""Test that is_valid returns False for expired token.""" """Test that is_valid returns False for expired token."""
expires_at = time.time() - 3600 # Expired 1 hour ago expires_at = time.time() - 3600 # Expired 1 hour ago
auth = JWTAuth(mock_jwt_token, expires_at=expires_at) auth = JWTAuth(mock_jwt_token, mock_wiki_base_url, expires_at=expires_at)
assert auth.is_valid() is False assert auth.is_valid() is False
def test_is_valid_considers_refresh_buffer(self, mock_jwt_token): def test_is_valid_considers_refresh_buffer(self, mock_jwt_token, mock_wiki_base_url):
"""Test that is_valid considers refresh buffer.""" """Test that is_valid considers refresh buffer."""
# Token expires in 4 minutes (less than 5 minute buffer) # Token expires in 4 minutes (less than 5 minute buffer)
expires_at = time.time() + 240 expires_at = time.time() + 240
auth = JWTAuth(mock_jwt_token, expires_at=expires_at) auth = JWTAuth(mock_jwt_token, mock_wiki_base_url, expires_at=expires_at)
assert auth.is_valid() is False # Should be invalid due to buffer assert auth.is_valid() is False # Should be invalid due to buffer
def test_is_valid_returns_false_for_empty_token(self, mock_jwt_token): def test_is_valid_returns_false_for_empty_token(self, mock_jwt_token, mock_wiki_base_url):
"""Test that is_valid handles edge cases.""" """Test that is_valid handles edge cases."""
auth = JWTAuth(mock_jwt_token) auth = JWTAuth(mock_jwt_token, mock_wiki_base_url)
auth._token = "" auth._token = ""
assert auth.is_valid() is False assert auth.is_valid() is False
@@ -117,57 +119,60 @@ class TestJWTAuth:
): ):
jwt_auth.refresh() jwt_auth.refresh()
def test_refresh_raises_not_implemented_error(self, mock_jwt_token): def test_refresh_with_refresh_token(self, mock_jwt_token, mock_wiki_base_url):
"""Test that refresh raises not implemented error.""" """Test that refresh is now implemented with refresh token."""
refresh_token = "refresh-token-123" refresh_token = "refresh-token-123"
auth = JWTAuth(mock_jwt_token, refresh_token) auth = JWTAuth(mock_jwt_token, mock_wiki_base_url, refresh_token)
with pytest.raises( # Mock the requests.post call
AuthenticationError, match="JWT token refresh not yet implemented" with patch("requests.post") as mock_post:
): # Simulate failed refresh (network issue)
auth.refresh() mock_post.side_effect = Exception("Network error")
with pytest.raises(AuthenticationError, match="Unexpected error during token refresh"):
auth.refresh()
def test_is_expired_returns_false_no_expiry(self, jwt_auth): def test_is_expired_returns_false_no_expiry(self, jwt_auth):
"""Test that is_expired returns False when no expiry set.""" """Test that is_expired returns False when no expiry set."""
assert jwt_auth.is_expired() is False assert jwt_auth.is_expired() is False
def test_is_expired_returns_false_for_valid_token(self, mock_jwt_token): def test_is_expired_returns_false_for_valid_token(self, mock_jwt_token, mock_wiki_base_url):
"""Test that is_expired returns False for valid token.""" """Test that is_expired returns False for valid token."""
expires_at = time.time() + 3600 # Expires in 1 hour expires_at = time.time() + 3600 # Expires in 1 hour
auth = JWTAuth(mock_jwt_token, expires_at=expires_at) auth = JWTAuth(mock_jwt_token, mock_wiki_base_url, expires_at=expires_at)
assert auth.is_expired() is False assert auth.is_expired() is False
def test_is_expired_returns_true_for_expired_token(self, mock_jwt_token): def test_is_expired_returns_true_for_expired_token(self, mock_jwt_token, mock_wiki_base_url):
"""Test that is_expired returns True for expired token.""" """Test that is_expired returns True for expired token."""
expires_at = time.time() - 3600 # Expired 1 hour ago expires_at = time.time() - 3600 # Expired 1 hour ago
auth = JWTAuth(mock_jwt_token, expires_at=expires_at) auth = JWTAuth(mock_jwt_token, mock_wiki_base_url, expires_at=expires_at)
assert auth.is_expired() is True assert auth.is_expired() is True
def test_time_until_expiry_returns_none_no_expiry(self, jwt_auth): def test_time_until_expiry_returns_none_no_expiry(self, jwt_auth):
"""Test that time_until_expiry returns None when no expiry set.""" """Test that time_until_expiry returns None when no expiry set."""
assert jwt_auth.time_until_expiry() is None assert jwt_auth.time_until_expiry() is None
def test_time_until_expiry_returns_correct_delta(self, mock_jwt_token): def test_time_until_expiry_returns_correct_delta(self, mock_jwt_token, mock_wiki_base_url):
"""Test that time_until_expiry returns correct timedelta.""" """Test that time_until_expiry returns correct timedelta."""
expires_at = time.time() + 3600 # Expires in 1 hour expires_at = time.time() + 3600 # Expires in 1 hour
auth = JWTAuth(mock_jwt_token, expires_at=expires_at) auth = JWTAuth(mock_jwt_token, mock_wiki_base_url, expires_at=expires_at)
time_left = auth.time_until_expiry() time_left = auth.time_until_expiry()
assert isinstance(time_left, timedelta) assert isinstance(time_left, timedelta)
# Should be approximately 1 hour (allowing for small time differences) # Should be approximately 1 hour (allowing for small time differences)
assert 3550 <= time_left.total_seconds() <= 3600 assert 3550 <= time_left.total_seconds() <= 3600
def test_time_until_expiry_returns_zero_for_expired(self, mock_jwt_token): def test_time_until_expiry_returns_zero_for_expired(self, mock_jwt_token, mock_wiki_base_url):
"""Test that time_until_expiry returns zero for expired token.""" """Test that time_until_expiry returns zero for expired token."""
expires_at = time.time() - 3600 # Expired 1 hour ago expires_at = time.time() - 3600 # Expired 1 hour ago
auth = JWTAuth(mock_jwt_token, expires_at=expires_at) auth = JWTAuth(mock_jwt_token, mock_wiki_base_url, expires_at=expires_at)
time_left = auth.time_until_expiry() time_left = auth.time_until_expiry()
assert time_left.total_seconds() == 0 assert time_left.total_seconds() == 0
def test_token_preview_masks_token(self, mock_jwt_token): def test_token_preview_masks_token(self, mock_jwt_token, mock_wiki_base_url):
"""Test that token_preview masks the token for security.""" """Test that token_preview masks the token for security."""
auth = JWTAuth(mock_jwt_token) auth = JWTAuth(mock_jwt_token, mock_wiki_base_url)
preview = auth.token_preview preview = auth.token_preview
assert preview != mock_jwt_token # Should not show full token assert preview != mock_jwt_token # Should not show full token
@@ -175,25 +180,25 @@ class TestJWTAuth:
assert preview.endswith(mock_jwt_token[-10:]) assert preview.endswith(mock_jwt_token[-10:])
assert "..." in preview assert "..." in preview
def test_token_preview_handles_short_token(self): def test_token_preview_handles_short_token(self, mock_wiki_base_url):
"""Test that token_preview handles short tokens.""" """Test that token_preview handles short tokens."""
short_token = "short" short_token = "short"
auth = JWTAuth(short_token) auth = JWTAuth(short_token, mock_wiki_base_url)
preview = auth.token_preview preview = auth.token_preview
assert preview == "*****" # Should be all asterisks assert preview == "*****" # Should be all asterisks
def test_token_preview_handles_none_token(self, mock_jwt_token): def test_token_preview_handles_none_token(self, mock_jwt_token, mock_wiki_base_url):
"""Test that token_preview handles None token.""" """Test that token_preview handles None token."""
auth = JWTAuth(mock_jwt_token) auth = JWTAuth(mock_jwt_token, mock_wiki_base_url)
auth._token = None auth._token = None
assert auth.token_preview == "None" assert auth.token_preview == "None"
def test_repr_shows_masked_token(self, mock_jwt_token): def test_repr_shows_masked_token(self, mock_jwt_token, mock_wiki_base_url):
"""Test that __repr__ shows masked token.""" """Test that __repr__ shows masked token."""
expires_at = time.time() + 3600 expires_at = time.time() + 3600
auth = JWTAuth(mock_jwt_token, expires_at=expires_at) auth = JWTAuth(mock_jwt_token, mock_wiki_base_url, expires_at=expires_at)
repr_str = repr(auth) repr_str = repr(auth)
assert "JWTAuth" in repr_str assert "JWTAuth" in repr_str
@@ -207,12 +212,12 @@ class TestJWTAuth:
jwt_auth.validate_credentials() jwt_auth.validate_credentials()
assert jwt_auth.is_valid() is True assert jwt_auth.is_valid() is True
def test_refresh_token_whitespace_handling(self, mock_jwt_token): def test_refresh_token_whitespace_handling(self, mock_jwt_token, mock_wiki_base_url):
"""Test that refresh token whitespace is handled correctly.""" """Test that refresh token whitespace is handled correctly."""
refresh_token = " refresh-token-123 " refresh_token = " refresh-token-123 "
auth = JWTAuth(mock_jwt_token, refresh_token) auth = JWTAuth(mock_jwt_token, mock_wiki_base_url, refresh_token)
assert auth._refresh_token == "refresh-token-123" assert auth._refresh_token == "refresh-token-123"
# Test None refresh token # Test None refresh token
auth = JWTAuth(mock_jwt_token, None) auth = JWTAuth(mock_jwt_token, mock_wiki_base_url, None)
assert auth._refresh_token is None assert auth._refresh_token is None

View File

@@ -25,9 +25,9 @@ def api_key_auth(mock_api_key):
@pytest.fixture @pytest.fixture
def jwt_auth(mock_jwt_token): def jwt_auth(mock_jwt_token, mock_wiki_base_url):
"""Fixture providing JWTAuth instance.""" """Fixture providing JWTAuth instance."""
return JWTAuth(mock_jwt_token) return JWTAuth(mock_jwt_token, mock_wiki_base_url)
@pytest.fixture @pytest.fixture

View File

@@ -81,40 +81,50 @@ class TestWikiJSClientTestConnection:
"""Mock API key.""" """Mock API key."""
return "test-api-key-12345" return "test-api-key-12345"
@patch("wikijs.client.requests.Session.get") @patch("wikijs.client.requests.Session.request")
def test_test_connection_success(self, mock_get, mock_wiki_base_url, mock_api_key): def test_test_connection_success(self, mock_request, mock_wiki_base_url, mock_api_key):
"""Test successful connection test.""" """Test successful connection test using GraphQL query."""
mock_response = Mock() mock_response = Mock()
mock_response.ok = True
mock_response.status_code = 200 mock_response.status_code = 200
mock_get.return_value = mock_response mock_response.json.return_value = {
"data": {
"site": {
"title": "Test Wiki"
}
}
}
mock_request.return_value = mock_response
client = WikiJSClient(mock_wiki_base_url, auth=mock_api_key) client = WikiJSClient(mock_wiki_base_url, auth=mock_api_key)
result = client.test_connection() result = client.test_connection()
assert result is True assert result is True
# Verify it made a POST request to GraphQL endpoint
mock_request.assert_called_once()
@patch("wikijs.client.requests.Session.get") @patch("wikijs.client.requests.Session.request")
def test_test_connection_timeout(self, mock_get, mock_wiki_base_url, mock_api_key): def test_test_connection_timeout(self, mock_request, mock_wiki_base_url, mock_api_key):
"""Test connection test timeout.""" """Test connection test timeout."""
import requests import requests
mock_get.side_effect = requests.exceptions.Timeout("Request timed out") mock_request.side_effect = requests.exceptions.Timeout("Request timed out")
client = WikiJSClient(mock_wiki_base_url, auth=mock_api_key) client = WikiJSClient(mock_wiki_base_url, auth=mock_api_key)
with pytest.raises(TimeoutError, match="Connection test timed out"): with pytest.raises(TimeoutError):
client.test_connection() client.test_connection()
@patch("wikijs.client.requests.Session.get") @patch("wikijs.client.requests.Session.request")
def test_test_connection_error(self, mock_get, mock_wiki_base_url, mock_api_key): def test_test_connection_error(self, mock_request, mock_wiki_base_url, mock_api_key):
"""Test connection test with connection error.""" """Test connection test with connection error."""
import requests import requests
mock_get.side_effect = requests.exceptions.ConnectionError("Connection failed") mock_request.side_effect = requests.exceptions.ConnectionError("Connection failed")
client = WikiJSClient(mock_wiki_base_url, auth=mock_api_key) client = WikiJSClient(mock_wiki_base_url, auth=mock_api_key)
with pytest.raises(ConnectionError, match="Cannot connect"): with pytest.raises(ConnectionError):
client.test_connection() client.test_connection()
def test_test_connection_no_base_url(self): def test_test_connection_no_base_url(self):
@@ -336,7 +346,7 @@ class TestWikiJSClientContextManager:
mock_session_class.return_value = mock_session mock_session_class.return_value = mock_session
# Mock generic exception during connection test # Mock generic exception during connection test
mock_session.get.side_effect = RuntimeError("Unexpected error") mock_session.request.side_effect = RuntimeError("Unexpected error")
client = WikiJSClient("https://wiki.example.com", auth="test-key") client = WikiJSClient("https://wiki.example.com", auth="test-key")

View File

@@ -19,17 +19,23 @@ class JWTAuth(AuthHandler):
Args: Args:
token: The JWT token string. token: The JWT token string.
base_url: The base URL of the Wiki.js instance (needed for token refresh).
refresh_token: Optional refresh token for automatic renewal. refresh_token: Optional refresh token for automatic renewal.
expires_at: Optional expiration timestamp (Unix timestamp). expires_at: Optional expiration timestamp (Unix timestamp).
Example: Example:
>>> auth = JWTAuth("eyJ0eXAiOiJKV1QiLCJhbGc...") >>> auth = JWTAuth(
... token="eyJ0eXAiOiJKV1QiLCJhbGc...",
... base_url="https://wiki.example.com",
... refresh_token="refresh_token_here"
... )
>>> client = WikiJSClient("https://wiki.example.com", auth=auth) >>> client = WikiJSClient("https://wiki.example.com", auth=auth)
""" """
def __init__( def __init__(
self, self,
token: str, token: str,
base_url: str,
refresh_token: Optional[str] = None, refresh_token: Optional[str] = None,
expires_at: Optional[float] = None, expires_at: Optional[float] = None,
) -> None: ) -> None:
@@ -37,16 +43,21 @@ class JWTAuth(AuthHandler):
Args: Args:
token: The JWT token string. token: The JWT token string.
base_url: The base URL of the Wiki.js instance.
refresh_token: Optional refresh token for automatic renewal. refresh_token: Optional refresh token for automatic renewal.
expires_at: Optional expiration timestamp (Unix timestamp). expires_at: Optional expiration timestamp (Unix timestamp).
Raises: Raises:
ValueError: If token is empty or None. ValueError: If token or base_url is empty or None.
""" """
if not token or not token.strip(): if not token or not token.strip():
raise ValueError("JWT token cannot be empty") raise ValueError("JWT token cannot be empty")
if not base_url or not base_url.strip():
raise ValueError("Base URL cannot be empty")
self._token = token.strip() self._token = token.strip()
self._base_url = base_url.strip().rstrip("/")
self._refresh_token = refresh_token.strip() if refresh_token else None self._refresh_token = refresh_token.strip() if refresh_token else None
self._expires_at = expires_at self._expires_at = expires_at
self._refresh_buffer = 300 # Refresh 5 minutes before expiration self._refresh_buffer = 300 # Refresh 5 minutes before expiration
@@ -91,15 +102,13 @@ class JWTAuth(AuthHandler):
def refresh(self) -> None: def refresh(self) -> None:
"""Refresh the JWT token using the refresh token. """Refresh the JWT token using the refresh token.
This method attempts to refresh the JWT token using the refresh token. This method attempts to refresh the JWT token using the refresh token
If no refresh token is available, it raises an AuthenticationError. by making a request to the Wiki.js authentication endpoint.
Note: This is a placeholder implementation. In a real implementation,
this would make an HTTP request to the Wiki.js token refresh endpoint.
Raises: Raises:
AuthenticationError: If refresh token is not available or refresh fails. AuthenticationError: If refresh token is not available or refresh fails.
""" """
import requests
from ..exceptions import AuthenticationError from ..exceptions import AuthenticationError
if not self._refresh_token: if not self._refresh_token:
@@ -107,16 +116,57 @@ class JWTAuth(AuthHandler):
"JWT token expired and no refresh token available" "JWT token expired and no refresh token available"
) )
# TODO: Implement actual token refresh logic try:
# This would typically involve: # Make request to Wiki.js token refresh endpoint
# 1. Making a POST request to /auth/refresh endpoint refresh_url = f"{self._base_url}/api/auth/refresh"
# 2. Sending the refresh token
# 3. Updating self._token and self._expires_at with the response
raise AuthenticationError( response = requests.post(
"JWT token refresh not yet implemented. " refresh_url,
"Please provide a new token or use API key authentication." json={"refreshToken": self._refresh_token},
) headers={
"Content-Type": "application/json",
"Accept": "application/json",
},
timeout=30,
)
# Check if request was successful
if not response.ok:
error_msg = "Token refresh failed"
try:
error_data = response.json()
error_msg = error_data.get("message", error_msg)
except Exception:
pass
raise AuthenticationError(
f"Failed to refresh JWT token: {error_msg} (HTTP {response.status_code})"
)
# Parse response
data = response.json()
# Update token and expiration
if "token" in data:
self._token = data["token"]
if "expiresAt" in data:
self._expires_at = data["expiresAt"]
elif "expires_at" in data:
self._expires_at = data["expires_at"]
# Optionally update refresh token if a new one is provided
if "refreshToken" in data:
self._refresh_token = data["refreshToken"]
except requests.exceptions.RequestException as e:
raise AuthenticationError(
f"Failed to refresh JWT token: {str(e)}"
) from e
except Exception as e:
raise AuthenticationError(
f"Unexpected error during token refresh: {str(e)}"
) from e
def is_expired(self) -> bool: def is_expired(self) -> bool:
"""Check if the JWT token is expired. """Check if the JWT token is expired.

View File

@@ -23,6 +23,7 @@ from .utils import (
normalize_url, normalize_url,
parse_wiki_response, parse_wiki_response,
) )
from .version import __version__
class WikiJSClient: class WikiJSClient:
@@ -82,7 +83,7 @@ class WikiJSClient:
# Request configuration # Request configuration
self.timeout = timeout self.timeout = timeout
self.verify_ssl = verify_ssl self.verify_ssl = verify_ssl
self.user_agent = user_agent or "wikijs-python-sdk/0.1.0" self.user_agent = user_agent or f"wikijs-python-sdk/{__version__}"
# Initialize HTTP session # Initialize HTTP session
self._session = self._create_session() self._session = self._create_session()
@@ -229,6 +230,9 @@ class WikiJSClient:
def test_connection(self) -> bool: def test_connection(self) -> bool:
"""Test connection to Wiki.js instance. """Test connection to Wiki.js instance.
This method validates the connection by making an actual GraphQL query
to the Wiki.js API, ensuring both connectivity and authentication work.
Returns: Returns:
True if connection successful True if connection successful
@@ -236,6 +240,7 @@ class WikiJSClient:
ConfigurationError: If client is not properly configured ConfigurationError: If client is not properly configured
ConnectionError: If cannot connect to server ConnectionError: If cannot connect to server
AuthenticationError: If authentication fails AuthenticationError: If authentication fails
TimeoutError: If connection test times out
""" """
if not self.base_url: if not self.base_url:
raise ConfigurationError("Base URL not configured") raise ConfigurationError("Base URL not configured")
@@ -244,20 +249,47 @@ class WikiJSClient:
raise ConfigurationError("Authentication not configured") raise ConfigurationError("Authentication not configured")
try: try:
# Try to hit a basic endpoint (will implement with actual endpoints) # Test with minimal GraphQL query to validate API access
# For now, just test basic connectivity query = """
self._session.get( query {
self.base_url, timeout=self.timeout, verify=self.verify_ssl site {
) title
}
}
"""
response = self._request("POST", "/graphql", json_data={"query": query})
# Check for GraphQL errors
if "errors" in response:
error_msg = response["errors"][0].get("message", "Unknown error")
raise AuthenticationError(
f"GraphQL query failed: {error_msg}"
)
# Verify we got expected data structure
if "data" not in response or "site" not in response["data"]:
raise APIError(
"Unexpected response format from Wiki.js API"
)
return True return True
except requests.exceptions.Timeout: except AuthenticationError:
raise TimeoutError( # Re-raise authentication errors as-is
f"Connection test timed out after {self.timeout} seconds" raise
)
except requests.exceptions.ConnectionError as e: except TimeoutError:
raise ConnectionError(f"Cannot connect to {self.base_url}: {str(e)}") # Re-raise timeout errors as-is
raise
except ConnectionError:
# Re-raise connection errors as-is
raise
except APIError:
# Re-raise API errors as-is
raise
except Exception as e: except Exception as e:
raise ConnectionError(f"Connection test failed: {str(e)}") raise ConnectionError(f"Connection test failed: {str(e)}")

View File

@@ -72,13 +72,19 @@ class RateLimitError(ClientError):
self.retry_after = retry_after self.retry_after = retry_after
class ConnectionError(WikiJSException): class ConnectionError(APIError):
"""Raised when there's a connection issue.""" """Raised when there's a connection issue."""
def __init__(self, message: str, details: Optional[Dict[str, Any]] = None) -> None:
super().__init__(message, status_code=None, response=None, details=details)
class TimeoutError(WikiJSException):
class TimeoutError(APIError):
"""Raised when a request times out.""" """Raised when a request times out."""
def __init__(self, message: str, details: Optional[Dict[str, Any]] = None) -> None:
super().__init__(message, status_code=None, response=None, details=details)
def create_api_error(status_code: int, message: str, response: Any = None) -> APIError: def create_api_error(status_code: int, message: str, response: Any = None) -> APIError:
"""Create appropriate API error based on status code. """Create appropriate API error based on status code.