Testing Guide
Testing Philosophy
Kakashi follows a comprehensive testing strategy with multiple test types to ensure reliability, performance, and maintainability.
Test Categories
Unit Tests
Test individual components in isolation:
import pytest
from kakashi.core.records import LogRecord, LogLevel, LogContext
from kakashi.core.pipeline import Pipeline, PipelineConfig
class TestLogRecord:
"""Unit tests for LogRecord functionality."""
def test_record_creation(self):
"""Test basic record creation."""
record = LogRecord(
timestamp=1234567890.0,
level=LogLevel.INFO,
logger_name='test.module',
message='Test message'
)
assert record.timestamp == 1234567890.0
assert record.level == LogLevel.INFO
assert record.logger_name == 'test.module'
assert record.message == 'Test message'
assert record.context is None
def test_record_with_context(self):
"""Test record creation with context."""
context = LogContext(
ip='192.168.1.1',
user_id='user123',
custom={'trace_id': 'abc-123'}
)
record = LogRecord(
timestamp=1234567890.0,
level=LogLevel.INFO,
logger_name='test.module',
message='Test message',
context=context
)
assert record.context.ip == '192.168.1.1'
assert record.context.user_id == 'user123'
assert record.context.custom['trace_id'] == 'abc-123'
def test_record_immutability(self):
"""Test that records are immutable."""
record = LogRecord(
timestamp=1234567890.0,
level=LogLevel.INFO,
logger_name='test.module',
message='Test message'
)
# Should raise AttributeError when trying to modify
with pytest.raises(AttributeError):
record.message = 'Modified message'
with pytest.raises(AttributeError):
record.level = LogLevel.ERROR
Integration Tests
Test component interactions and workflows:
import pytest
import tempfile
import json
from pathlib import Path
from kakashi.core.pipeline import Pipeline, PipelineConfig
from kakashi.core.formatters import json_formatter
from kakashi.core.writers import file_writer
class TestPipelineIntegration:
"""Integration tests for pipeline components."""
def test_complete_logging_flow(self):
"""Test complete flow from record to file output."""
with tempfile.TemporaryDirectory() as temp_dir:
log_file = Path(temp_dir) / 'test.log'
# Create pipeline with file output
config = PipelineConfig(
min_level=LogLevel.INFO,
enrichers=(timestamp_enricher, context_enricher),
filters=(level_filter,),
formatter=json_formatter,
writers=(file_writer(log_file),)
)
pipeline = Pipeline(config)
# Process multiple records
records = [
LogRecord(
timestamp=1234567890.0,
level=LogLevel.INFO,
logger_name='test.app',
message='Application started'
),
LogRecord(
timestamp=1234567891.0,
level=LogLevel.WARNING,
logger_name='test.app',
message='Configuration warning',
context=LogContext(user_id='admin')
),
LogRecord(
timestamp=1234567892.0,
level=LogLevel.DEBUG, # Should be filtered out
logger_name='test.app',
message='Debug message'
)
]
for record in records:
pipeline.process(record)
# Verify output
assert log_file.exists()
lines = log_file.read_text().strip().split('\n')
assert len(lines) == 2 # DEBUG message filtered out
# Verify JSON structure
log1 = json.loads(lines[0])
assert log1['level'] == 'INFO'
assert log1['message'] == 'Application started'
log2 = json.loads(lines[1])
assert log2['level'] == 'WARNING'
assert log2['context']['user_id'] == 'admin'
def test_multi_writer_pipeline(self):
"""Test pipeline with multiple writers."""
with tempfile.TemporaryDirectory() as temp_dir:
file1 = Path(temp_dir) / 'app.log'
file2 = Path(temp_dir) / 'errors.log'
# Create pipeline with multiple writers
config = PipelineConfig(
min_level=LogLevel.INFO,
formatter=json_formatter,
writers=(
file_writer(file1),
conditional_file_writer(file2, min_level=LogLevel.ERROR)
)
)
pipeline = Pipeline(config)
# Process records of different levels
pipeline.process(LogRecord(
timestamp=1234567890.0,
level=LogLevel.INFO,
logger_name='test',
message='Info message'
))
pipeline.process(LogRecord(
timestamp=1234567891.0,
level=LogLevel.ERROR,
logger_name='test',
message='Error message'
))
# Verify outputs
assert file1.exists()
assert file2.exists()
# Both messages in app.log
app_lines = file1.read_text().strip().split('\n')
assert len(app_lines) == 2
# Only error message in errors.log
error_lines = file2.read_text().strip().split('\n')
assert len(error_lines) == 1
error_log = json.loads(error_lines[0])
assert error_log['level'] == 'ERROR'
Async Tests
Test asynchronous components:
import pytest
import asyncio
from kakashi.core.async_pipeline import AsyncPipeline, AsyncPipelineConfig
class TestAsyncPipeline:
"""Tests for async pipeline functionality."""
@pytest.mark.asyncio
async def test_async_pipeline_processing(self):
"""Test async pipeline processes records correctly."""
processed_messages = []
async def test_writer(message: str) -> None:
processed_messages.append(message)
config = AsyncPipelineConfig(
min_level=LogLevel.INFO,
formatter=json_formatter,
writers=(test_writer,),
buffer_size=10,
flush_interval=0.1
)
pipeline = AsyncPipeline(config)
# Process records
records = [
LogRecord(
timestamp=1234567890.0 + i,
level=LogLevel.INFO,
logger_name='test.async',
message=f'Message {i}'
)
for i in range(5)
]
for record in records:
await pipeline.process(record)
# Wait for flush
await asyncio.sleep(0.2)
# Verify all messages processed
assert len(processed_messages) == 5
for i, message in enumerate(processed_messages):
log_data = json.loads(message)
assert log_data['message'] == f'Message {i}'
@pytest.mark.asyncio
async def test_async_backpressure(self):
"""Test async pipeline handles backpressure correctly."""
processed_count = 0
dropped_count = 0
async def slow_writer(message: str) -> None:
nonlocal processed_count
await asyncio.sleep(0.01) # Simulate slow I/O
processed_count += 1
def drop_handler(record: LogRecord) -> None:
nonlocal dropped_count
dropped_count += 1
config = AsyncPipelineConfig(
min_level=LogLevel.INFO,
formatter=json_formatter,
writers=(slow_writer,),
buffer_size=10,
flush_interval=0.1,
backpressure_limit=20,
drop_handler=drop_handler
)
pipeline = AsyncPipeline(config)
# Send many records quickly to trigger backpressure
for i in range(50):
await pipeline.process(LogRecord(
timestamp=1234567890.0 + i,
level=LogLevel.INFO,
logger_name='test.backpressure',
message=f'Message {i}'
))
# Wait for processing
await asyncio.sleep(1.0)
await pipeline.flush()
# Verify some messages were dropped due to backpressure
assert dropped_count > 0
assert processed_count + dropped_count == 50
Performance Tests
Test performance characteristics:
import pytest
import time
import threading
from concurrent.futures import ThreadPoolExecutor
@pytest.mark.performance
class TestLoggingPerformance:
"""Performance tests for logging components."""
def test_single_thread_throughput(self):
"""Test single-thread logging throughput."""
# Create high-performance pipeline
config = PipelineConfig(
min_level=LogLevel.INFO,
formatter=fast_text_formatter,
writers=(noop_writer,) # No-op writer for pure pipeline testing
)
pipeline = Pipeline(config)
# Warm up
for _ in range(100):
pipeline.process(create_test_record())
# Benchmark
start_time = time.time()
num_records = 100000
for i in range(num_records):
pipeline.process(LogRecord(
timestamp=start_time + i * 0.000001,
level=LogLevel.INFO,
logger_name='perf.test',
message=f'Performance test message {i}'
))
elapsed = time.time() - start_time
throughput = num_records / elapsed
# Assert minimum throughput (adjust based on hardware)
assert throughput > 100000, f"Throughput {throughput:.0f} records/sec too low"
print(f"Single-thread throughput: {throughput:.0f} records/sec")
def test_concurrent_throughput(self):
"""Test concurrent logging throughput."""
config = PipelineConfig(
min_level=LogLevel.INFO,
formatter=json_formatter,
writers=(noop_writer,)
)
pipeline = Pipeline(config)
num_threads = 10
records_per_thread = 10000
def log_worker(thread_id: int) -> float:
start_time = time.time()
for i in range(records_per_thread):
pipeline.process(LogRecord(
timestamp=start_time + i * 0.000001,
level=LogLevel.INFO,
logger_name=f'perf.thread{thread_id}',
message=f'Thread {thread_id} message {i}'
))
return time.time() - start_time
# Run concurrent test
start_time = time.time()
with ThreadPoolExecutor(max_workers=num_threads) as executor:
thread_times = list(executor.map(log_worker, range(num_threads)))
total_time = time.time() - start_time
total_records = num_threads * records_per_thread
overall_throughput = total_records / total_time
print(f"Concurrent throughput: {overall_throughput:.0f} records/sec")
print(f"Average thread time: {sum(thread_times) / len(thread_times):.3f}s")
# Assert reasonable concurrent performance
assert overall_throughput > 50000, f"Concurrent throughput too low: {overall_throughput:.0f}"
def test_memory_usage(self):
"""Test memory usage during logging."""
import tracemalloc
import gc
tracemalloc.start()
config = PipelineConfig(
min_level=LogLevel.INFO,
formatter=json_formatter,
writers=(noop_writer,)
)
pipeline = Pipeline(config)
# Take initial snapshot
snapshot1 = tracemalloc.take_snapshot()
# Perform logging operations
for i in range(10000):
pipeline.process(LogRecord(
timestamp=time.time(),
level=LogLevel.INFO,
logger_name='memory.test',
message=f'Memory test message {i}',
context=LogContext(
user_id=f'user_{i % 100}',
custom={'iteration': i, 'data': 'x' * 50}
)
))
# Force garbage collection
gc.collect()
# Take final snapshot
snapshot2 = tracemalloc.take_snapshot()
# Analyze memory usage
top_stats = snapshot2.compare_to(snapshot1, 'lineno')
total_memory = sum(stat.size_diff for stat in top_stats if stat.size_diff > 0)
# Memory usage should be reasonable (less than 10MB for 10k records)
memory_mb = total_memory / 1024 / 1024
assert memory_mb < 10, f"Memory usage too high: {memory_mb:.1f}MB"
print(f"Memory usage: {memory_mb:.1f}MB for 10,000 records")
Framework Integration Tests
Test web framework integrations:
import pytest
from unittest.mock import Mock, patch
class TestFastAPIIntegration:
"""Test FastAPI integration."""
@pytest.mark.skipif(not FASTAPI_AVAILABLE, reason="FastAPI not available")
def test_fastapi_middleware_setup(self):
"""Test FastAPI middleware setup."""
from fastapi import FastAPI
from kakashi.integrations.fastapi_integration import setup_fastapi_enterprise
app = FastAPI()
# Set up middleware
middleware = setup_fastapi_enterprise(app, service_name="test-api")
# Verify middleware was added
assert len(app.user_middleware) > 0
# Check middleware configuration
middleware_obj = app.user_middleware[0]
assert 'ObservabilityMiddleware' in str(middleware_obj.cls)
@pytest.mark.skipif(not FASTAPI_AVAILABLE, reason="FastAPI not available")
@pytest.mark.asyncio
async def test_fastapi_request_logging(self):
"""Test FastAPI request logging."""
from fastapi import FastAPI
from fastapi.testclient import TestClient
from kakashi.integrations.fastapi_integration import setup_fastapi_enterprise
app = FastAPI()
setup_fastapi_enterprise(app, service_name="test-api")
@app.get("/test")
async def test_endpoint():
return {"message": "test"}
client = TestClient(app)
# Capture logs
with patch('kakashi.info') as mock_log:
response = client.get("/test")
assert response.status_code == 200
# Verify request was logged
mock_log.assert_called()
# Check log content
call_args = mock_log.call_args
assert 'GET /test' in str(call_args)
class TestFlaskIntegration:
"""Test Flask integration."""
@pytest.mark.skipif(not FLASK_AVAILABLE, reason="Flask not available")
def test_flask_setup(self):
"""Test Flask integration setup."""
from flask import Flask
from kakashi.integrations.flask_integration import setup_flask_enterprise
app = Flask(__name__)
# Set up integration
handler = setup_flask_enterprise(app, service_name="test-flask")
# Verify handler was configured
assert handler is not None
assert hasattr(app, '_mylogs_setup')
assert app._mylogs_setup is True
@pytest.mark.skipif(not FLASK_AVAILABLE, reason="Flask not available")
def test_flask_request_logging(self):
"""Test Flask request logging."""
from flask import Flask
from kakashi.integrations.flask_integration import setup_flask_enterprise
app = Flask(__name__)
setup_flask_enterprise(app, service_name="test-flask")
@app.route('/test')
def test_endpoint():
return {"message": "test"}
client = app.test_client()
# Capture logs
with patch('kakashi.info') as mock_log:
response = client.get('/test')
assert response.status_code == 200
# Verify request was logged
mock_log.assert_called()
Test Utilities
Test Fixtures
Common test fixtures for reuse:
import pytest
import tempfile
from pathlib import Path
from kakashi.core.records import LogRecord, LogLevel, LogContext
@pytest.fixture
def temp_log_file():
"""Provide a temporary log file."""
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.log') as f:
log_file = Path(f.name)
yield log_file
# Cleanup
if log_file.exists():
log_file.unlink()
@pytest.fixture
def sample_log_record():
"""Provide a sample log record for testing."""
return LogRecord(
timestamp=1234567890.0,
level=LogLevel.INFO,
logger_name='test.module',
message='Test message',
context=LogContext(
ip='192.168.1.1',
user_id='test_user',
custom={'trace_id': 'abc-123'}
)
)
@pytest.fixture
def mock_writer():
"""Provide a mock writer for testing."""
messages = []
def writer(message: str) -> None:
messages.append(message)
writer.messages = messages
return writer
@pytest.fixture
def test_pipeline(mock_writer):
"""Provide a test pipeline with mock writer."""
config = PipelineConfig(
min_level=LogLevel.DEBUG,
formatter=json_formatter,
writers=(mock_writer,)
)
return Pipeline(config)
Test Helpers
Utility functions for testing:
def create_test_record(
level: LogLevel = LogLevel.INFO,
message: str = "Test message",
logger_name: str = "test.logger",
**context_fields
) -> LogRecord:
"""Create a test log record with optional context."""
context = None
if context_fields:
context = LogContext(**context_fields)
return LogRecord(
timestamp=time.time(),
level=level,
logger_name=logger_name,
message=message,
context=context
)
def assert_log_contains(log_output: str, expected_fields: dict):
"""Assert that log output contains expected fields."""
import json
log_data = json.loads(log_output)
for field, expected_value in expected_fields.items():
assert field in log_data, f"Field '{field}' not found in log output"
assert log_data[field] == expected_value, f"Field '{field}' has value {log_data[field]}, expected {expected_value}"
def wait_for_async_processing(pipeline: AsyncPipeline, timeout: float = 1.0):
"""Wait for async pipeline to process all pending records."""
import asyncio
async def wait():
start_time = time.time()
while pipeline.get_buffer_size() > 0 and time.time() - start_time < timeout:
await asyncio.sleep(0.01)
# Final flush
await pipeline.flush()
asyncio.run(wait())
Test Configuration
pytest Configuration
# pytest.ini
[tool:pytest]
testpaths = tests
python_files = test_*.py *_test.py
python_classes = Test*
python_functions = test_*
addopts =
--strict-markers
--disable-warnings
-v
--tb=short
markers =
unit: Unit tests
integration: Integration tests
performance: Performance tests
slow: Slow tests (can be skipped)
asyncio: Async tests
fastapi: FastAPI integration tests
flask: Flask integration tests
django: Django integration tests
# Async test configuration
asyncio_mode = auto
Coverage Configuration
# .coveragerc
[run]
source = kakashi
omit =
*/tests/*
*/examples/*
*/venv/*
*/build/*
[report]
exclude_lines =
pragma: no cover
def __repr__
raise AssertionError
raise NotImplementedError
if __name__ == .__main__.:
[html]
directory = htmlcov
Running Tests
Basic Test Execution
# Run all tests
pytest
# Run with coverage
pytest --cov=kakashi --cov-report=html
# Run specific test categories
pytest -m unit
pytest -m integration
pytest -m performance
# Run tests in parallel
pytest -n auto
# Run with verbose output
pytest -v -s
CI/CD Integration
# .github/workflows/test.yml
name: Tests
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.7, 3.8, 3.9, "3.10", "3.11", "3.12"]
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -e .[dev,all]
- name: Run tests
run: |
pytest --cov=kakashi --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
This comprehensive testing guide ensures Kakashi maintains high quality and reliability across all components and use cases.
Last updated: 2025-08-27 Contributors: [IntegerAlex]