Async Backends
Async Logging Architecture
For high-throughput applications, Kakashi provides async backends that perform non-blocking I/O operations and batch writes for optimal performance.
When to Use Async Logging
- High-throughput applications (>1000 logs/second)
- I/O-bound workloads where logging shouldn't block request processing
- Production environments where performance is critical
- Applications with strict latency requirements
Basic Async Setup
from kakashi.core.async_interface import setup_async_logging
# Enable async logging for production
setup_async_logging(
environment="production",
service_name="my-api",
version="1.0.0",
max_queue_size=25000,
worker_count=2,
batch_size=200
)
Async Pipeline Components
AsyncPipeline
from kakashi.core.async_pipeline import AsyncPipeline, AsyncPipelineConfig
from kakashi.core.pipeline import PipelineConfig
from kakashi.core.async_backend import AsyncConfig
# Create standard pipeline config
pipeline_config = PipelineConfig(
min_level=LogLevel.INFO,
enrichers=(thread_enricher, exception_enricher),
filters=(),
formatter=default_json_formatter,
writers=(file_writer("app.log"),)
)
# Create async config
async_config = AsyncConfig(
max_queue_size=25000,
worker_count=2,
batch_size=200,
batch_timeout=0.1,
enable_batching=True
)
# Create async pipeline config
async_pipeline_config = AsyncPipelineConfig(
pipeline_config=pipeline_config,
async_config=async_config,
enable_async=True,
fallback_to_sync=True
)
pipeline = AsyncPipeline(async_pipeline_config)
Async Backend Configuration
from kakashi.core.async_backend import AsyncConfig, configure_async_backend
# Configure global async backend
backend = configure_async_backend(
max_queue_size=50000,
worker_count=4,
batch_size=500,
enable_batching=True,
overflow_strategy="drop_oldest"
)
High-Performance Async Logging
from kakashi.core.async_interface import get_high_performance_logger
# Get ultra-high-performance logger
logger = get_high_performance_logger(
name="trading",
log_file="trades.log",
max_queue_size=100000,
worker_count=8,
batch_size=1000
)
# In high-frequency trading loop:
for trade in trades:
logger.info("Trade executed", **trade.to_dict()) # Microsecond latency
Network Logging
from kakashi.core.async_interface import get_network_logger
def send_to_elasticsearch(message):
# Your Elasticsearch client code
es_client.index(index="logs", body=json.loads(message))
logger = get_network_logger("api", send_to_elasticsearch)
logger.info("API call", endpoint="/users", response_time=45)
# Sent to Elasticsearch asynchronously
Performance Monitoring
Monitor async pipeline performance:
from kakashi.core.async_interface import get_async_stats
# Get runtime statistics
stats = get_async_stats()
print(f"Queue size: {stats['queue_size']}")
print(f"Messages processed: {stats['messages_enqueued']}")
print(f"Active workers: {stats['active_workers']}")
Configuration Examples
High-Throughput Web Server
from kakashi.core.async_interface import setup_async_logging
def setup_web_server_logging():
"""Optimized for web servers with high request volume."""
setup_async_logging(
environment="production",
service_name="web-api",
version="2.1.0",
max_queue_size=100000,
worker_count=8,
batch_size=1000
)
Real-time Analytics
from kakashi.core.async_interface import get_async_structured_logger
def setup_analytics_logging():
"""Logger for real-time analytics with low latency."""
logger = get_async_structured_logger(
"analytics",
service="user-analytics",
version="1.0.0"
)
return logger
Error Handling and Recovery
from kakashi.core.async_interface import shutdown_async_backend
def graceful_shutdown():
"""Gracefully shutdown the async logging backend."""
try:
shutdown_async_backend(timeout=10.0)
print("Async logging shutdown complete")
except Exception as e:
print(f"Error during shutdown: {e}")
Best Practices
- Queue Sizing: Start with 25,000-50,000 max queue size, adjust based on throughput
- Worker Count: Use 2-4 workers for most applications, 8+ for extreme throughput
- Batch Sizing: Start with 200-500 batch size, increase for higher throughput
- Monitoring: Monitor queue sizes and worker health in production
- Error Handling: Implement graceful shutdown to prevent message loss
- Graceful Shutdown: Ensure all queued messages are processed on shutdown
import atexit
from kakashi.core.async_interface import shutdown_async_backend
# Register shutdown handler
atexit.register(shutdown_async_backend)
# Or manually at exit:
def cleanup():
shutdown_async_backend(timeout=10.0)
Last updated: 2025-08-27 Contributors: [IntegerAlex]