Skip to main content

Pipeline Composition

Creating Custom Pipelines

Kakashi's functional architecture allows you to compose custom logging pipelines from reusable components.

Basic Pipeline Creation

from kakashi.core.pipeline import Pipeline, PipelineConfig
from kakashi.core.records import LogLevel

# Create a simple console pipeline
config = PipelineConfig(
min_level=LogLevel.INFO,
enrichers=(thread_enricher, exception_enricher),
filters=(),
formatter=default_json_formatter,
writers=(console_writer,)
)

pipeline = Pipeline(config)

Custom Enrichers

Enrichers add metadata to log records:

from kakashi.core.records import LogRecord
import time
import threading

def timestamp_enricher(record: LogRecord) -> LogRecord:
"""Add high-precision timestamp."""
# Create new record with updated timestamp
return LogRecord(
timestamp=time.time(),
level=record.level,
logger_name=record.logger_name,
message=record.message,
fields=record.fields,
context=record.context,
exception=record.exception,
exception_traceback=record.exception_traceback,
module=record.module,
function=record.function,
line_number=record.line_number,
thread_id=record.thread_id,
thread_name=record.thread_name,
process_id=record.process_id
)

def thread_enricher(record: LogRecord) -> LogRecord:
"""Add thread information."""
return LogRecord(
timestamp=record.timestamp,
level=record.level,
logger_name=record.logger_name,
message=record.message,
fields=record.fields,
context=record.context,
exception=record.exception,
exception_traceback=record.exception_traceback,
module=record.module,
function=record.function,
line_number=record.line_number,
thread_id=threading.get_ident(),
thread_name=threading.current_thread().name,
process_id=record.process_id
)

def correlation_id_enricher(record: LogRecord) -> LogRecord:
"""Add correlation ID from thread-local storage."""
import threading
_local = threading.local()

correlation_id = getattr(_local, 'correlation_id', None)
if correlation_id and record.context:
# Create new context with correlation ID
new_context = record.context.with_custom(correlation_id=correlation_id)
return record.with_context(new_context)
return record

Custom Filters

Filters determine which records to process:

def sensitive_data_filter(record: LogRecord) -> bool:
"""Filter out logs containing sensitive data."""
sensitive_patterns = ['password', 'token', 'secret']
message_lower = record.message.lower()
return not any(pattern in message_lower for pattern in sensitive_patterns)

def rate_limit_filter(record: LogRecord) -> bool:
"""Rate limit logs by logger name."""
# Implementation would track rate limits per logger
# This is a placeholder - implement your own rate limiting logic
return True

def environment_filter(record: LogRecord) -> bool:
"""Only process logs for current environment."""
import os
if record.context and record.context.environment:
return record.context.environment == os.getenv('ENVIRONMENT', 'development')
return True

Custom Formatters

Formatters convert records to strings:

import json
from datetime import datetime

def compact_json_formatter(record: LogRecord) -> str:
"""Compact JSON format for production."""
data = {
't': datetime.fromtimestamp(record.timestamp).isoformat(),
'l': record.level.name,
'n': record.logger_name,
'm': record.message
}

if record.context:
if record.context.ip:
data['ip'] = record.context.ip
if record.context.user_id:
data['uid'] = record.context.user_id
if record.context.custom:
data.update(record.context.custom)

return json.dumps(data, separators=(',', ':'))

def logfmt_formatter(record: LogRecord) -> str:
"""Logfmt format for easy parsing."""
parts = [
f'time={datetime.fromtimestamp(record.timestamp).isoformat()}',
f'level={record.level.name}',
f'logger={record.logger_name}',
f'msg="{record.message}"'
]

if record.context:
if record.context.ip:
parts.append(f'ip={record.context.ip}')
if record.context.user_id:
parts.append(f'user_id={record.context.user_id}')

return ' '.join(parts)

Custom Writers

Writers send formatted logs to destinations:

from pathlib import Path
import time

def rotating_file_writer(log_file: Path, max_size: int = 100_000_000):
"""File writer with size-based rotation."""
def writer(message: str) -> None:
try:
if log_file.exists() and log_file.stat().st_size > max_size:
# Rotate file
backup_file = log_file.with_suffix(f'.{int(time.time())}.log')
log_file.rename(backup_file)

with log_file.open('a', encoding='utf-8') as f:
f.write(message + '\n')
f.flush()
except (OSError, UnicodeError) as e:
print(f"File write error: {e}", file=sys.stderr)

return writer

def network_writer(endpoint: str, timeout: float = 5.0):
"""Send logs to network endpoint."""
import requests

def writer(message: str) -> None:
try:
requests.post(
endpoint,
json={'log': message},
timeout=timeout
)
except requests.RequestException as e:
print(f"Network write error: {e}", file=sys.stderr)

return writer

Pipeline Composition Patterns

Multi-Destination Pipeline

def create_multi_destination_pipeline(log_dir: Path) -> Pipeline:
"""Pipeline that writes to console, file, and metrics."""

config = PipelineConfig(
min_level=LogLevel.INFO,
enrichers=(
thread_enricher,
exception_enricher,
),
filters=(
sensitive_data_filter,
environment_filter,
),
formatter=default_json_formatter,
writers=(
console_writer,
rotating_file_writer(log_dir / 'app.log'),
)
)

return Pipeline(config)

Environment-Specific Pipelines

def create_development_pipeline() -> Pipeline:
"""Development pipeline with verbose output."""
return Pipeline(PipelineConfig(
min_level=LogLevel.DEBUG,
enrichers=(thread_enricher, exception_enricher),
formatter=simple_text_formatter,
writers=(console_writer,)
))

def create_production_pipeline(log_dir: Path) -> Pipeline:
"""Production pipeline optimized for performance."""
return Pipeline(PipelineConfig(
min_level=LogLevel.INFO,
enrichers=(thread_enricher, exception_enricher),
filters=(sensitive_data_filter,),
formatter=default_json_formatter,
writers=(
rotating_file_writer(log_dir / 'app.log'),
)
))

Conditional Pipeline Selection

def create_adaptive_pipeline() -> Pipeline:
"""Pipeline that adapts based on environment."""
import os
from pathlib import Path

environment = os.getenv('ENVIRONMENT', 'development')

if environment == 'development':
return create_development_pipeline()
elif environment == 'production':
return create_production_pipeline(Path('/var/log/myapp'))
else: # testing
return create_testing_pipeline()

def create_testing_pipeline() -> Pipeline:
"""Testing pipeline with minimal overhead."""
return Pipeline(PipelineConfig(
min_level=LogLevel.WARNING,
enrichers=(),
filters=(),
formatter=compact_formatter,
writers=(null_writer,)
))

Performance Considerations

  • Hot Path Optimization: Place expensive enrichers after filters
  • Memory Efficiency: Use immutable data structures to enable sharing
  • Error Isolation: Each component should handle its own errors
  • Lazy Evaluation: Only compute expensive fields when needed
def performance_optimized_pipeline() -> Pipeline:
"""High-performance pipeline for production."""
return Pipeline(PipelineConfig(
min_level=LogLevel.INFO,
# Fast filters first
filters=(),
# Lightweight enrichers
enrichers=(thread_enricher, exception_enricher),
# Optimized formatter
formatter=default_json_formatter,
# Efficient writers
writers=(file_writer("app.log"),)
))

Last updated: 2025-08-27 Contributors: [IntegerAlex]