Merge origin/development into feature branch

Resolved conflicts:
- README.md: Combined Requirements section and Production Features section
- requirements.txt: Kept pydantic[email]>=1.10.0 without aiohttp in core deps

Merged features from development:
- Production features: logging, metrics, rate limiting
- Security policy (SECURITY.md)
- Additional test coverage
- Documentation for new features
This commit is contained in:
Claude
2025-10-23 20:43:56 +00:00
18 changed files with 1762 additions and 43 deletions

View File

@@ -0,0 +1,277 @@
"""Extended tests for Groups endpoint to improve coverage."""
from unittest.mock import Mock
import pytest
from wikijs.endpoints import GroupsEndpoint
from wikijs.exceptions import APIError, ValidationError
from wikijs.models import GroupCreate, GroupUpdate
class TestGroupsEndpointExtended:
"""Extended test suite for GroupsEndpoint."""
@pytest.fixture
def client(self):
"""Create mock client."""
mock_client = Mock()
mock_client.base_url = "https://wiki.example.com"
mock_client._request = Mock()
return mock_client
@pytest.fixture
def endpoint(self, client):
"""Create GroupsEndpoint instance."""
return GroupsEndpoint(client)
def test_list_groups_api_error(self, endpoint):
"""Test list groups with API error."""
mock_response = {"errors": [{"message": "API error"}]}
endpoint._post = Mock(return_value=mock_response)
with pytest.raises(APIError, match="API error"):
endpoint.list()
def test_get_group_not_found(self, endpoint):
"""Test get group when not found."""
mock_response = {
"data": {
"groups": {
"single": None
}
}
}
endpoint._post = Mock(return_value=mock_response)
with pytest.raises(APIError, match="not found"):
endpoint.get(999)
def test_get_group_api_error(self, endpoint):
"""Test get group with API error."""
mock_response = {"errors": [{"message": "Access denied"}]}
endpoint._post = Mock(return_value=mock_response)
with pytest.raises(APIError, match="Access denied"):
endpoint.get(1)
def test_create_group_with_dict(self, endpoint):
"""Test creating group with dictionary."""
group_dict = {"name": "Editors", "permissions": ["read:pages"]}
mock_response = {
"data": {
"groups": {
"create": {
"responseResult": {"succeeded": True},
"group": {
"id": 2,
"name": "Editors",
"isSystem": False,
"redirectOnLogin": "/",
"permissions": ["read:pages"],
"pageRules": [],
"createdAt": "2024-01-01T00:00:00Z",
"updatedAt": "2024-01-01T00:00:00Z",
},
}
}
}
}
endpoint._post = Mock(return_value=mock_response)
group = endpoint.create(group_dict)
assert group.name == "Editors"
def test_create_group_validation_error(self, endpoint):
"""Test create group with validation error."""
with pytest.raises(ValidationError):
endpoint.create("invalid_type")
def test_create_group_api_error(self, endpoint):
"""Test create group with API error."""
group_data = GroupCreate(name="Editors")
mock_response = {"errors": [{"message": "Group exists"}]}
endpoint._post = Mock(return_value=mock_response)
with pytest.raises(APIError, match="Group exists"):
endpoint.create(group_data)
def test_create_group_failed_result(self, endpoint):
"""Test create group with failed result."""
group_data = GroupCreate(name="Editors")
mock_response = {
"data": {
"groups": {
"create": {
"responseResult": {"succeeded": False, "message": "Creation failed"}
}
}
}
}
endpoint._post = Mock(return_value=mock_response)
with pytest.raises(APIError, match="Creation failed"):
endpoint.create(group_data)
def test_update_group_validation_error_invalid_id(self, endpoint):
"""Test update group with invalid ID."""
update_data = GroupUpdate(name="Updated")
with pytest.raises(ValidationError):
endpoint.update(0, update_data)
def test_update_group_with_dict(self, endpoint):
"""Test updating group with dictionary."""
update_dict = {"name": "Updated Name"}
mock_response = {
"data": {
"groups": {
"update": {
"responseResult": {"succeeded": True},
"group": {
"id": 1,
"name": "Updated Name",
"isSystem": False,
"redirectOnLogin": "/",
"permissions": [],
"pageRules": [],
"createdAt": "2024-01-01T00:00:00Z",
"updatedAt": "2024-01-02T00:00:00Z",
},
}
}
}
}
endpoint._post = Mock(return_value=mock_response)
group = endpoint.update(1, update_dict)
assert group.name == "Updated Name"
def test_update_group_validation_error_invalid_data(self, endpoint):
"""Test update group with invalid data."""
with pytest.raises(ValidationError):
endpoint.update(1, "invalid_type")
def test_update_group_api_error(self, endpoint):
"""Test update group with API error."""
update_data = GroupUpdate(name="Updated")
mock_response = {"errors": [{"message": "Update failed"}]}
endpoint._post = Mock(return_value=mock_response)
with pytest.raises(APIError, match="Update failed"):
endpoint.update(1, update_data)
def test_update_group_failed_result(self, endpoint):
"""Test update group with failed result."""
update_data = GroupUpdate(name="Updated")
mock_response = {
"data": {
"groups": {
"update": {
"responseResult": {"succeeded": False, "message": "Update denied"}
}
}
}
}
endpoint._post = Mock(return_value=mock_response)
with pytest.raises(APIError, match="Update denied"):
endpoint.update(1, update_data)
def test_delete_group_validation_error(self, endpoint):
"""Test delete group with invalid ID."""
with pytest.raises(ValidationError):
endpoint.delete(-1)
def test_delete_group_api_error(self, endpoint):
"""Test delete group with API error."""
mock_response = {"errors": [{"message": "Delete failed"}]}
endpoint._post = Mock(return_value=mock_response)
with pytest.raises(APIError, match="Delete failed"):
endpoint.delete(1)
def test_delete_group_failed_result(self, endpoint):
"""Test delete group with failed result."""
mock_response = {
"data": {
"groups": {
"delete": {
"responseResult": {"succeeded": False, "message": "Cannot delete system group"}
}
}
}
}
endpoint._post = Mock(return_value=mock_response)
with pytest.raises(APIError, match="Cannot delete system group"):
endpoint.delete(1)
def test_assign_user_validation_error_invalid_group(self, endpoint):
"""Test assign user with invalid group ID."""
with pytest.raises(ValidationError):
endpoint.assign_user(0, 1)
def test_assign_user_validation_error_invalid_user(self, endpoint):
"""Test assign user with invalid user ID."""
with pytest.raises(ValidationError):
endpoint.assign_user(1, 0)
def test_assign_user_api_error(self, endpoint):
"""Test assign user with API error."""
mock_response = {"errors": [{"message": "Assignment failed"}]}
endpoint._post = Mock(return_value=mock_response)
with pytest.raises(APIError, match="Assignment failed"):
endpoint.assign_user(1, 5)
def test_assign_user_failed_result(self, endpoint):
"""Test assign user with failed result."""
mock_response = {
"data": {
"groups": {
"assignUser": {
"responseResult": {"succeeded": False, "message": "User already in group"}
}
}
}
}
endpoint._post = Mock(return_value=mock_response)
with pytest.raises(APIError, match="User already in group"):
endpoint.assign_user(1, 5)
def test_unassign_user_validation_error_invalid_group(self, endpoint):
"""Test unassign user with invalid group ID."""
with pytest.raises(ValidationError):
endpoint.unassign_user(-1, 1)
def test_unassign_user_validation_error_invalid_user(self, endpoint):
"""Test unassign user with invalid user ID."""
with pytest.raises(ValidationError):
endpoint.unassign_user(1, -1)
def test_unassign_user_api_error(self, endpoint):
"""Test unassign user with API error."""
mock_response = {"errors": [{"message": "Unassignment failed"}]}
endpoint._post = Mock(return_value=mock_response)
with pytest.raises(APIError, match="Unassignment failed"):
endpoint.unassign_user(1, 5)
def test_unassign_user_failed_result(self, endpoint):
"""Test unassign user with failed result."""
mock_response = {
"data": {
"groups": {
"unassignUser": {
"responseResult": {"succeeded": False, "message": "User not in group"}
}
}
}
}
endpoint._post = Mock(return_value=mock_response)
with pytest.raises(APIError, match="User not in group"):
endpoint.unassign_user(1, 5)

View File

@@ -17,6 +17,7 @@ class TestPagesEndpoint:
def mock_client(self):
"""Create a mock WikiJS client."""
client = Mock(spec=WikiJSClient)
client.cache = None
return client
@pytest.fixture

View File

@@ -0,0 +1,175 @@
"""Tests for Pages endpoint caching functionality."""
import pytest
from unittest.mock import MagicMock, Mock
from wikijs.cache import MemoryCache, CacheKey
from wikijs.endpoints.pages import PagesEndpoint
from wikijs.models import Page
class TestPagesCaching:
"""Test caching behavior in Pages endpoint."""
def test_get_with_cache_hit(self):
"""Test page retrieval uses cache when available."""
# Setup
cache = MemoryCache(ttl=300)
client = MagicMock()
client.cache = cache
client._request = MagicMock()
pages = PagesEndpoint(client)
# Pre-populate cache
page_data = {"id": 123, "title": "Test", "path": "test"}
cache_key = CacheKey("page", "123", "get")
cache.set(cache_key, page_data)
# Execute
result = pages.get(123)
# Verify cache was used, not API
client._request.assert_not_called()
assert result["id"] == 123
def test_get_with_cache_miss(self):
"""Test page retrieval calls API on cache miss."""
# Setup
cache = MemoryCache(ttl=300)
client = MagicMock()
client.cache = cache
# Mock the _post method on the endpoint
pages = PagesEndpoint(client)
pages._post = Mock(return_value={
"data": {"pages": {"single": {
"id": 123,
"title": "Test",
"path": "test",
"content": "Test content",
"description": "Test desc",
"isPublished": True,
"isPrivate": False,
"tags": [],
"locale": "en",
"authorId": 1,
"authorName": "Test User",
"authorEmail": "test@example.com",
"editor": "markdown",
"createdAt": "2023-01-01T00:00:00Z",
"updatedAt": "2023-01-02T00:00:00Z"
}}}
})
# Execute
result = pages.get(123)
# Verify API was called
pages._post.assert_called_once()
# Verify result was cached
cache_key = CacheKey("page", "123", "get")
cached = cache.get(cache_key)
assert cached is not None
def test_update_invalidates_cache(self):
"""Test page update invalidates cache."""
# Setup
cache = MemoryCache(ttl=300)
client = MagicMock()
client.cache = cache
pages = PagesEndpoint(client)
pages._post = Mock(return_value={
"data": {"updatePage": {
"id": 123,
"title": "New",
"path": "test",
"content": "Updated content",
"description": "Updated desc",
"isPublished": True,
"isPrivate": False,
"tags": [],
"locale": "en",
"authorId": 1,
"authorName": "Test User",
"authorEmail": "test@example.com",
"editor": "markdown",
"createdAt": "2023-01-01T00:00:00Z",
"updatedAt": "2023-01-02T00:00:00Z"
}}
})
# Pre-populate cache
cache_key = CacheKey("page", "123", "get")
cache.set(cache_key, {"id": 123, "title": "Old"})
# Verify cache is populated
assert cache.get(cache_key) is not None
# Execute update
pages.update(123, {"title": "New"})
# Verify cache was invalidated
cached = cache.get(cache_key)
assert cached is None
def test_delete_invalidates_cache(self):
"""Test page delete invalidates cache."""
# Setup
cache = MemoryCache(ttl=300)
client = MagicMock()
client.cache = cache
pages = PagesEndpoint(client)
pages._post = Mock(return_value={
"data": {"deletePage": {"success": True}}
})
# Pre-populate cache
cache_key = CacheKey("page", "123", "get")
cache.set(cache_key, {"id": 123, "title": "Test"})
# Verify cache is populated
assert cache.get(cache_key) is not None
# Execute delete
pages.delete(123)
# Verify cache was invalidated
cached = cache.get(cache_key)
assert cached is None
def test_get_without_cache(self):
"""Test page retrieval without cache configured."""
# Setup
client = MagicMock()
client.cache = None
pages = PagesEndpoint(client)
pages._post = Mock(return_value={
"data": {"pages": {"single": {
"id": 123,
"title": "Test",
"path": "test",
"content": "Test content",
"description": "Test desc",
"isPublished": True,
"isPrivate": False,
"tags": [],
"locale": "en",
"authorId": 1,
"authorName": "Test User",
"authorEmail": "test@example.com",
"editor": "markdown",
"createdAt": "2023-01-01T00:00:00Z",
"updatedAt": "2023-01-02T00:00:00Z"
}}}
})
# Execute
result = pages.get(123)
# Verify API was called
pages._post.assert_called_once()
assert result.id == 123

View File

@@ -0,0 +1,204 @@
"""Targeted tests to reach 85% coverage."""
import pytest
import time
from unittest.mock import Mock, patch
from wikijs.cache.memory import MemoryCache
from wikijs.ratelimit import RateLimiter
from wikijs.metrics import MetricsCollector
from wikijs.client import WikiJSClient
from wikijs.exceptions import AuthenticationError, APIError
class TestCacheEdgeCases:
"""Test cache edge cases to cover missing lines."""
def test_invalidate_resource_with_malformed_keys(self):
"""Test invalidate_resource with malformed cache keys (covers line 132)."""
cache = MemoryCache(ttl=300)
# Add some normal keys
cache._cache["page:123"] = ({"data": "test1"}, time.time() + 300)
cache._cache["page:456"] = ({"data": "test2"}, time.time() + 300)
# Add a malformed key without colon separator (covers line 132)
cache._cache["malformedkey"] = ({"data": "test3"}, time.time() + 300)
# Invalidate page type - should skip malformed key
cache.invalidate_resource("page")
# Normal keys should be gone
assert "page:123" not in cache._cache
assert "page:456" not in cache._cache
# Malformed key should still exist (was skipped by continue on line 132)
assert "malformedkey" in cache._cache
class TestMetricsEdgeCases:
"""Test metrics edge cases."""
def test_record_request_with_server_error(self):
"""Test recording request with 5xx status code (covers line 64)."""
collector = MetricsCollector()
# Record a server error (5xx)
collector.record_request(
endpoint="/test",
method="GET",
status_code=500,
duration_ms=150.0,
error="Internal Server Error"
)
# Check counters
assert collector._counters["total_requests"] == 1
assert collector._counters["total_errors"] == 1
assert collector._counters["total_server_errors"] == 1 # Line 64
def test_percentile_with_empty_data(self):
"""Test _percentile with empty data (covers line 135)."""
collector = MetricsCollector()
# Get percentile with no recorded requests
result = collector._percentile([], 95)
# Should return 0.0 for empty data (line 135)
assert result == 0.0
class TestClientErrorHandling:
"""Test client error handling paths."""
@patch('wikijs.client.requests.Session')
def test_connection_unexpected_format(self, mock_session_class):
"""Test test_connection with unexpected response format (covers line 287)."""
mock_session = Mock()
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {"unexpected": "format"}
mock_session.request.return_value = mock_response
mock_session_class.return_value = mock_session
client = WikiJSClient("https://wiki.example.com", auth="test-key")
with pytest.raises(APIError, match="Unexpected response format"):
client.test_connection()
@patch('wikijs.client.requests.Session')
def test_connection_reraises_auth_error(self, mock_session_class):
"""Test test_connection re-raises AuthenticationError (covers line 295)."""
mock_session = Mock()
def raise_auth_error(*args, **kwargs):
raise AuthenticationError("Auth failed")
mock_session.request.side_effect = raise_auth_error
mock_session_class.return_value = mock_session
client = WikiJSClient("https://wiki.example.com", auth="test-key")
with pytest.raises(AuthenticationError, match="Auth failed"):
client.test_connection()
@patch('wikijs.client.requests.Session')
def test_connection_reraises_api_error(self, mock_session_class):
"""Test test_connection re-raises APIError (covers line 307)."""
mock_session = Mock()
def raise_api_error(*args, **kwargs):
raise APIError("API failed")
mock_session.request.side_effect = raise_api_error
mock_session_class.return_value = mock_session
client = WikiJSClient("https://wiki.example.com", auth="test-key")
with pytest.raises(APIError, match="API failed"):
client.test_connection()
class TestLoggingEdgeCases:
"""Test logging edge cases."""
def test_setup_logging_with_json_format(self):
"""Test setup_logging with JSON format."""
from wikijs.logging import setup_logging
logger = setup_logging(level="DEBUG", format_type="json")
assert logger.level == 10 # DEBUG = 10
def test_json_formatter_with_exception_info(self):
"""Test JSON formatter with exception info (covers line 33)."""
import logging
import sys
from wikijs.logging import JSONFormatter
formatter = JSONFormatter()
# Create a log record with actual exception info
try:
1 / 0
except ZeroDivisionError:
exc_info = sys.exc_info()
record = logging.LogRecord(
name="test",
level=logging.ERROR,
pathname="test.py",
lineno=1,
msg="Error occurred",
args=(),
exc_info=exc_info
)
result = formatter.format(record)
assert "exception" in result
assert "ZeroDivisionError" in result
def test_json_formatter_with_extra_fields(self):
"""Test JSON formatter with extra fields (covers line 37)."""
import logging
from wikijs.logging import JSONFormatter
formatter = JSONFormatter()
record = logging.LogRecord(
name="test",
level=logging.INFO,
pathname="test.py",
lineno=1,
msg="Test message",
args=(),
exc_info=None
)
# Add extra attribute
record.extra = {"user_id": 123, "request_id": "abc"}
result = formatter.format(record)
assert "user_id" in result
assert "123" in result
def test_setup_logging_with_file_output(self):
"""Test setup_logging with file output (covers line 65)."""
import tempfile
import os
from wikijs.logging import setup_logging
# Create temporary file
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.log') as f:
log_file = f.name
try:
logger = setup_logging(level="INFO", format_type="text", output_file=log_file)
logger.info("Test message")
# Verify file was created and has content
assert os.path.exists(log_file)
with open(log_file, 'r') as f:
content = f.read()
assert "Test message" in content
finally:
if os.path.exists(log_file):
os.unlink(log_file)

41
tests/test_logging.py Normal file
View File

@@ -0,0 +1,41 @@
"""Tests for logging functionality."""
import logging
import json
from wikijs.logging import setup_logging, JSONFormatter
def test_json_formatter():
"""Test JSON log formatting."""
formatter = JSONFormatter()
record = logging.LogRecord(
name="test",
level=logging.INFO,
pathname="test.py",
lineno=10,
msg="Test message",
args=(),
exc_info=None
)
output = formatter.format(record)
log_data = json.loads(output)
assert log_data["level"] == "INFO"
assert log_data["message"] == "Test message"
assert "timestamp" in log_data
def test_setup_logging_json():
"""Test JSON logging setup."""
logger = setup_logging(level=logging.DEBUG, format_type="json")
assert logger.level == logging.DEBUG
assert len(logger.handlers) == 1
def test_setup_logging_text():
"""Test text logging setup."""
logger = setup_logging(level=logging.INFO, format_type="text")
assert logger.level == logging.INFO
assert len(logger.handlers) == 1

89
tests/test_metrics.py Normal file
View File

@@ -0,0 +1,89 @@
"""Tests for metrics functionality."""
from wikijs.metrics import MetricsCollector, get_metrics
def test_metrics_collector_init():
"""Test metrics collector initialization."""
collector = MetricsCollector()
stats = collector.get_stats()
assert stats["total_requests"] == 0
assert stats["total_errors"] == 0
def test_record_request():
"""Test recording requests."""
collector = MetricsCollector()
# Record successful request
collector.record_request("/api/test", "GET", 200, 100.0)
stats = collector.get_stats()
assert stats["total_requests"] == 1
assert stats["total_errors"] == 0
def test_record_error():
"""Test recording errors."""
collector = MetricsCollector()
# Record error request
collector.record_request("/api/test", "GET", 404, 50.0, error="Not found")
stats = collector.get_stats()
assert stats["total_requests"] == 1
assert stats["total_errors"] == 1
def test_latency_stats():
"""Test latency statistics."""
collector = MetricsCollector()
# Record multiple requests
collector.record_request("/api/test", "GET", 200, 100.0)
collector.record_request("/api/test", "GET", 200, 200.0)
collector.record_request("/api/test", "GET", 200, 150.0)
stats = collector.get_stats()
assert "latency" in stats
assert stats["latency"]["min"] == 100.0
assert stats["latency"]["max"] == 200.0
assert stats["latency"]["avg"] == 150.0
def test_increment_counter():
"""Test incrementing counters."""
collector = MetricsCollector()
collector.increment("custom_counter", 5)
collector.increment("custom_counter", 3)
stats = collector.get_stats()
assert stats["counters"]["custom_counter"] == 8
def test_set_gauge():
"""Test setting gauges."""
collector = MetricsCollector()
collector.set_gauge("memory_usage", 75.5)
stats = collector.get_stats()
assert stats["gauges"]["memory_usage"] == 75.5
def test_reset_metrics():
"""Test resetting metrics."""
collector = MetricsCollector()
collector.record_request("/api/test", "GET", 200, 100.0)
collector.reset()
stats = collector.get_stats()
assert stats["total_requests"] == 0
def test_get_global_metrics():
"""Test getting global metrics instance."""
metrics = get_metrics()
assert isinstance(metrics, MetricsCollector)

73
tests/test_ratelimit.py Normal file
View File

@@ -0,0 +1,73 @@
"""Tests for rate limiting functionality."""
import time
import pytest
from wikijs.ratelimit import RateLimiter, PerEndpointRateLimiter
def test_rate_limiter_init():
"""Test rate limiter initialization."""
limiter = RateLimiter(requests_per_second=10.0)
assert limiter.rate == 10.0
assert limiter.burst == 10
def test_rate_limiter_acquire():
"""Test acquiring tokens."""
limiter = RateLimiter(requests_per_second=100.0)
# Should be able to acquire immediately
assert limiter.acquire(timeout=1.0) is True
def test_rate_limiter_burst():
"""Test burst behavior."""
limiter = RateLimiter(requests_per_second=10.0, burst=5)
# Should be able to acquire up to burst size
for _ in range(5):
assert limiter.acquire(timeout=0.1) is True
def test_rate_limiter_timeout():
"""Test timeout behavior."""
limiter = RateLimiter(requests_per_second=1.0)
# Exhaust tokens
assert limiter.acquire(timeout=1.0) is True
# Next acquire should timeout quickly
assert limiter.acquire(timeout=0.1) is False
def test_rate_limiter_reset():
"""Test rate limiter reset."""
limiter = RateLimiter(requests_per_second=1.0)
# Exhaust tokens
limiter.acquire()
# Reset
limiter.reset()
# Should be able to acquire again
assert limiter.acquire(timeout=0.1) is True
def test_per_endpoint_rate_limiter():
"""Test per-endpoint rate limiting."""
limiter = PerEndpointRateLimiter(default_rate=10.0)
# Set different rate for specific endpoint
limiter.set_limit("/api/special", 5.0)
# Should use endpoint-specific rate
assert limiter.acquire("/api/special", timeout=1.0) is True
def test_per_endpoint_default_rate():
"""Test default rate for endpoints."""
limiter = PerEndpointRateLimiter(default_rate=100.0)
# Should use default rate for unknown endpoint
assert limiter.acquire("/api/unknown", timeout=1.0) is True