The Complete Guide to Fluentd

    A comprehensive guide to mastering Fluentd for unified logging and data collection

    Prerequisites and Learning Path

    Prerequisites

    • Basic understanding of system administration (Linux/Windows)
    • Familiarity with log files and logging concepts
    • Basic knowledge of JSON and regular expressions
    • Understanding of network protocols (HTTP, TCP, UDP)
    • Basic command-line skills

    Learning Paths

    Beginner Path (0-2 weeks)

    Complete chapters 1-4, then 6 (basic outputs), 8 (basic routing), and 15 (essential best practices)

    Intermediate Path (2-6 weeks)

    Follow beginner path, then add chapters 5, 7, 9, 11, and 14

    Expert Path (6+ weeks)

    Complete all chapters including advanced topics: 10, 12, 13

    Table of Contents

    Part I: Foundations

    1. Introduction to Fluentd
    2. Architecture and Core Concepts
    3. Installation and Quick Start
    4. Configuration Fundamentals

    Part II: Core Components

    1. Input Plugins and Data Sources
    2. Output Plugins and Destinations
    3. Processing with Filters and Parsers
    4. Routing and Event Flow

    Part III: Advanced Operations

    1. Buffer Management and Reliability
    2. Performance Optimization
    3. Monitoring and Troubleshooting
    4. Security and Compliance

    Part IV: Expert Topics

    1. Advanced Patterns and Use Cases
    2. Custom Plugin Development
    3. Production Deployment Strategies

    Part V: Reference

    1. Best Practices and Patterns
    2. Troubleshooting Guide
    3. Plugin Reference
    4. Configuration Templates

    1. Introduction to Fluentd

    What is Fluentd?

    Fluentd is an open-source data collector that provides a unified logging layer, enabling you to collect, process, and route log data from various sources to multiple destinations. Think of it as a “data pipeline” that sits between your applications and your data storage systems.

    Why Fluentd?

    Problem it Solves

    Modern applications generate logs in different formats, locations, and volumes. Without a unified approach, you end up with:

    • Multiple custom scripts for each log source
    • Inconsistent log formats and timestamps
    • Difficult troubleshooting across services
    • Manual log shipping and rotation

    Fluentd’s Solution

    graph LR
        A[Multiple Log Sources] --> B[Fluentd]
        B --> C[Unified Processing]
        C --> D[Multiple Destinations]
    
        A1[Application Logs] --> B
        A2[System Logs] --> B
        A3[Database Logs] --> B
        A4[Container Logs] --> B
    
        B --> D1[Elasticsearch]
        B --> D2[S3]
        B --> D3[Kafka]
        B --> D4[Monitoring Systems]
    
        style B fill:#f9f,stroke:#333,stroke-width:2px

    Key Features

    1. Unified Logging

    • Collects logs from heterogeneous sources
    • Normalizes different log formats
    • Provides consistent timestamp handling
    • Enables centralized log management

    2. Pluggable Architecture

    • 500+ community-contributed plugins
    • Easy to extend and customize
    • Supports virtually any input/output system
    • Modular design for specific use cases

    3. Reliable Data Delivery

    • Built-in buffering mechanisms
    • Automatic retry with exponential backoff
    • At-least-once delivery guarantee
    • Graceful handling of downstream failures

    4. High Performance

    • Memory-efficient processing
    • Optimized for high-throughput scenarios
    • Horizontal scaling capabilities
    • Low-latency data forwarding

    5. JSON-Native Processing

    • Native JSON support throughout pipeline
    • Schema-less data handling
    • Easy field manipulation and enrichment
    • Efficient serialization/deserialization

    Comparison with Other Tools

    Fluentd vs. Logstash

    graph TB
        subgraph "Fluentd"
            F1[Ruby-based]
            F2[Lower Memory Usage]
            F3[Simpler Configuration]
            F4[Better for Forwarding]
        end
    
        subgraph "Logstash"
            L1[JRuby-based]
            L2[Higher Memory Usage]
            L3[More Complex Features]
            L4[Better for Processing]
        end
    
        style F1 fill:#e8f5e8
        style L1 fill:#fff3e0
    FeatureFluentdLogstashUse When
    Memory UsageLower (~40MB)Higher (~200MB)Resource-constrained environments
    ConfigurationSimple, declarativeMore complex, proceduralSimple forwarding vs complex processing
    Plugin Ecosystem500+ plugins200+ pluginsNeed specific integrations
    PerformanceHigh throughputHigh processing powerVolume vs transformation focus

    Fluentd vs. Filebeat/Fluent Bit

    • Fluentd: Full-featured log processor and router
    • Filebeat/Fluent Bit: Lightweight log shippers
    • Use Together: Filebeat/Fluent Bit → Fluentd → Storage

    When to Use Fluentd

    ✅ Ideal Use Cases

    • Multi-service Architectures: Microservices, containers, distributed systems
    • Cloud-native Applications: Kubernetes, Docker, serverless functions
    • Data Lake Ingestion: Collecting data for analytics and ML pipelines
    • Real-time Monitoring: Feeding monitoring and alerting systems
    • Compliance Logging: Audit trails, regulatory requirements
    • Log Aggregation: Centralizing logs from multiple sources

    ⚠️ Consider Alternatives When

    • Simple Single-source Logging: Basic file rotation might suffice
    • Extremely High-volume Streams: Kafka Connect might be better
    • Heavy Data Transformation: Logstash might be more suitable
    • Resource-critical Environments: Fluent Bit might be better

    Real-World Use Cases

    Case Study 1: E-commerce Platform

    graph TD
        A[Web Applications] --> F[Fluentd]
        B[Payment Service] --> F
        C[Inventory System] --> F
        D[User Analytics] --> F
    
        F --> E[Elasticsearch - Search/Debug]
        F --> G[S3 - Long-term Storage]
        F --> H[Kafka - Real-time Analytics]
        F --> I[Slack - Alerts]
    
        style F fill:#f9f,stroke:#333,stroke-width:2px

    Benefits Achieved:

    • Reduced MTTR from 2 hours to 15 minutes
    • Unified view across 20+ microservices
    • Real-time fraud detection through log analysis
    • Automated compliance reporting

    Case Study 2: IoT Data Pipeline

    graph LR
        A[IoT Devices] --> B[Edge Fluentd]
        B --> C[Central Fluentd]
        C --> D[Time Series DB]
        C --> E[ML Pipeline]
        C --> F[Dashboard]

    Configuration Example:

    # Edge processing
    <source>
      @type mqtt
      host sensor-gateway.local
      topic sensors/+/data
    </source>
    
    <filter sensors.**>
      @type record_transformer
      <record>
        location ${tag_parts[1]}
        timestamp ${Time.now.utc.iso8601}
      </record>
    </filter>
    
    <match sensors.**>
      @type forward
      <server>
        host central-fluentd.example.com
        port 24224
      </server>
    </match>
    ANSI

    Getting Started Checklist

    Before diving into Fluentd, ensure you have:

    • Environment Setup: Linux/Windows/macOS with admin access
    • Log Sources Identified: Know what logs you want to collect
    • Destinations Planned: Where do you want to send logs?
    • Basic Requirements: Understand volume, retention, and performance needs
    • Network Access: Ensure connectivity between sources and destinations

    Next Steps

    Now that you understand what Fluentd is and when to use it, let’s dive into its architecture and core concepts in the next chapter.


    2. Architecture and Core Concepts

    Fluentd Architecture

    graph TD
        A[Input Plugins] --> B[Engine]
        B --> C[Router]
        C --> D[Filter Plugins]
        D --> E[Output Plugins]
        E --> F[Buffer]
        F --> G[Destination]
    
        H[Parser] --> B
        I[Formatter] --> E
    
        style B fill:#e1f5fe
        style C fill:#f3e5f5
        style F fill:#fff3e0

    Core Components

    1. Input Plugins

    • Receive or pull data from external sources
    • Examples: tail, forward, http, syslog

    2. Parser Plugins

    • Parse incoming data into structured format
    • Examples: json, regexp, csv, apache2

    3. Filter Plugins

    • Modify, enrich, or filter event streams
    • Examples: grep, record_transformer, geoip

    4. Output Plugins

    • Send processed data to destinations
    • Examples: elasticsearch, s3, kafka, mongodb

    5. Formatter Plugins

    • Format data before sending to destinations
    • Examples: json, csv, msgpack, ltsv

    6. Buffer Plugins

    • Improve performance and reliability
    • Examples: memory, file

    Event Flow

    sequenceDiagram
        participant Source
        participant Input
        participant Engine
        participant Filter
        participant Output
        participant Destination
    
        Source->>Input: Raw Data
        Input->>Engine: Parsed Event
        Engine->>Filter: Tagged Event
        Filter->>Output: Processed Event
        Output->>Destination: Formatted Data

    Event Structure

    Every Fluentd event consists of:

    • Tag: Identifier for routing (e.g., myapp.access)
    • Time: Event timestamp
    • Record: JSON object containing the actual data
    {
      "tag": "myapp.access",
      "time": 1634567890,
      "record": {
        "host": "192.168.1.100",
        "method": "GET",
        "path": "/api/users",
        "status": 200,
        "response_time": 0.123
      }
    }
    JSON

    3. Installation and Quick Start

    3.1 Installation Methods

    Ubuntu/Debian:

    curl -fsSL https://toolbelt.treasuredata.com/sh/install-ubuntu-focal-fluent-package5.sh | sh
    sudo systemctl start fluentd
    sudo systemctl enable fluentd
    Bash

    CentOS/RHEL:

    curl -fsSL https://toolbelt.treasuredata.com/sh/install-redhat-fluent-package5.sh | sh
    sudo systemctl start fluentd
    sudo systemctl enable fluentd
    Bash

    Windows:

    # Using Chocolatey
    choco install fluentd
    
    # Or download MSI from official website
    Bash
    # Pull official image
    docker pull fluentd:latest
    
    # Create config directory
    mkdir -p ./fluentd/etc
    
    # Run with custom config
    docker run -d \
      --name fluentd \
      -p 24224:24224 \
      -v $(pwd)/fluentd/etc:/fluentd/etc \
      -v /var/log:/var/log:ro \
      fluentd:latest
    Bash

    Method 3: Gem Installation (Development/Testing)

    gem install fluentd
    fluentd --setup ./fluent
    fluentd -c ./fluent/fluent.conf -vv &
    Bash

    3.2 Directory Structure

    /etc/fluent/
    ├── fluent.conf          # Main configuration file
    ├── plugin/              # Custom plugins directory
    ├── conf.d/             # Additional configuration files
    └── certs/              # SSL certificates
    
    /var/log/fluent/
    ├── buffer/             # Buffer storage
    ├── failed/             # Failed events
    └── fluentd.log         # Fluentd process logs
    Bash

    3.3 Quick Start Tutorial

    Let’s create a simple logging pipeline that demonstrates Fluentd’s core concepts.

    Step 1: Basic Configuration

    Create a basic configuration file:

    # fluent.conf
    <source>
      @type forward
      port 24224
      bind 0.0.0.0
    </source>
    
    <match **>
      @type stdout
    </match>
    ANSI

    Step 2: Test the Setup

    # Start Fluentd
    fluentd -c fluent.conf -vv
    
    # In another terminal, send a test message
    echo '{"message":"Hello Fluentd!","timestamp":"'$(date -Iseconds)'"}' | \
    fluent-cat test.logs
    Bash

    Expected output:

    2024-01-01 12:00:00.000000000 +0000 test.logs: {"message":"Hello Fluentd!","timestamp":"2024-01-01T12:00:00+00:00"}
    Bash

    Step 3: Add File Input

    Modify the configuration to read from log files:

    # fluent.conf
    <source>
      @type tail
      path /var/log/sample.log
      pos_file /var/log/fluent/sample.log.pos
      tag sample.logs
      format json
    </source>
    
    <match sample.logs>
      @type stdout
    </match>
    ANSI

    Create a test log file:

    echo '{"level":"INFO","message":"Application started","user_id":123}' >> /var/log/sample.log
    echo '{"level":"ERROR","message":"Database connection failed","error_code":500}' >> /var/log/sample.log
    ANSI

    Step 4: Add Filtering

    Add a filter to process only ERROR level logs:

    # fluent.conf
    <source>
      @type tail
      path /var/log/sample.log
      pos_file /var/log/fluent/sample.log.pos
      tag sample.logs
      format json
    </source>
    
    <filter sample.logs>
      @type grep
      <regexp>
        key level
        pattern ERROR
      </regexp>
    </filter>
    
    <filter sample.logs>
      @type record_transformer
      <record>
        hostname ${hostname}
        processed_at ${Time.now.utc.iso8601}
      </record>
    </filter>
    
    <match sample.logs>
      @type stdout
    </match>
    ANSI

    Step 5: Multiple Outputs

    Send logs to both console and file:

    # fluent.conf
    <source>
      @type tail
      path /var/log/sample.log
      pos_file /var/log/fluent/sample.log.pos
      tag sample.logs
      format json
    </source>
    
    <filter sample.logs>
      @type record_transformer
      <record>
        hostname ${hostname}
        processed_at ${Time.now.utc.iso8601}
      </record>
    </filter>
    
    <match sample.logs>
      @type copy
    
      <store>
        @type stdout
      </store>
    
      <store>
        @type file
        path /var/log/fluent/processed.log
        format json
      </store>
    </match>
    ANSI

    3.4 Hands-on Lab: Complete Logging Pipeline

    Objective: Create a complete logging pipeline that collects, processes, and routes logs from a web application.

    Scenario: You have a web application that generates access logs and error logs. You want to:

    1. Collect both types of logs
    2. Parse and enrich them
    3. Send access logs to Elasticsearch
    4. Send error logs to Slack alerts
    5. Archive all logs to S3

    Lab Setup:

    1. Create sample log files:
    # Access logs
    mkdir -p /var/log/webapp
    cat > /var/log/webapp/access.log << 'EOF'
    192.168.1.100 - - [19/Sep/2024:10:00:00 +0000] "GET /api/users HTTP/1.1" 200 1234
    192.168.1.101 - - [19/Sep/2024:10:00:01 +0000] "POST /api/login HTTP/1.1" 200 567
    192.168.1.102 - - [19/Sep/2024:10:00:02 +0000] "GET /api/orders HTTP/1.1" 404 89
    EOF
    
    # Error logs
    
    cat > /var/log/webapp/error.log << 'EOF'
    {"timestamp":"2024-09-19T10:00:00Z","level":"ERROR","message":"Database connection timeout","service":"user-api"}
    {"timestamp":"2024-09-19T10:00:01Z","level":"WARN","message":"High memory usage detected","service":"order-api"}
    {"timestamp":"2024-09-19T10:00:02Z","level":"ERROR","message":"Payment gateway unreachable","service":"payment-api"}
    EOF
    ANSI
    1. Create sample log files:
    # webapp-logging.conf
    <system>
      log_level info
    </system>
    
    # Collect access logs
    <source>
      @type tail
      path /var/log/webapp/access.log
      pos_file /var/log/fluent/access.log.pos
      tag webapp.access
    
      <parse>
        @type apache2
        key_name message
      </parse>
    </source>
    
    # Collect error logs
    <source>
      @type tail
      path /var/log/webapp/error.log
      pos_file /var/log/fluent/error.log.pos
      tag webapp.error
    
      <parse>
        @type json
      </parse>
    </source>
    
    # Enrich all logs
    <filter webapp.**>
      @type record_transformer
      <record>
        environment production
        application webapp
        processed_at ${Time.now.utc.iso8601}
      </record>
    </filter>
    
    # Route access logs to "elasticsearch"
    <match webapp.access>
      @type file
      path /var/log/fluent/elasticsearch.log
      append true
    
      <format>
        @type json
      </format>
    </match>
    
    # Route error logs to "slack"
    <match webapp.error>
      @type file
      path /var/log/fluent/slack-alerts.log
      append true
    
      <format>
        @type json
      </format>
    </match>
    ANSI
    1. Run the lab:
    # Start Fluentd
    fluentd -c webapp-logging.conf -vv
    
    
    # In another terminal, append new logs
    
    echo '192.168.1.103 - - [19/Sep/2024:10:00:03 +0000] "DELETE /api/users/123 HTTP/1.1" 500 234' >> /var/log/webapp/access.log
    echo '{"timestamp":"2024-09-19T10:00:03Z","level":"ERROR","message":"Disk space critical","service":"storage-api"}' >> /var/log/webapp/error.log
    
    # Check outputs
    
    tail -f /var/log/fluent/elasticsearch.log
    tail -f /var/log/fluent/slack-alerts.log
    ANSI

    Expected Results:

    • Access logs should appear in `/var/log/fluent/elasticsearch.log` with enriched fields
    • Error logs should appear in `/var/log/fluent/slack-alerts.log` with enriched fields
    • Both should include the environment, application, and processed_at fields

    3.5 Validation and Testing

    Configuration Validation

    # Dry run to check configuration syntax
    fluentd -c fluent.conf --dry-run
    
    # Check for plugin availability
    fluentd --show-plugin-config=input:tail
    Bash

    Performance Testing

    # Generate test data
    for i in {1..1000}; do
      echo '{"id":'$i',"message":"Test message '$i'","timestamp":"'$(date -Iseconds)'"}' | \
      fluent-cat test.performance
    done
    
    # Monitor performance
    curl http://localhost:24220/api/plugins.json | jq '.plugins[] | select(.type=="input")'
    Bash

    3.6 Common Installation Issues

    IssueSymptomsSolution
    Permission deniedCannot write to log directorysudo chown -R fluentd:fluentd /var/log/fluent
    Port already in useAddress already in use (bind)`sudo netstat -tulpn \
    Plugin not foundUnknown input plugin ‘xyz’fluent-gem install fluent-plugin-xyz
    Out of memoryProcess killed, high memory usageReduce buffer sizes in configuration

    Next Steps

    Now that you have Fluentd installed and running, let’s dive deeper into configuration fundamentals in the next chapter.


    4. Configuration Basics

    Configuration File Structure

    Fluentd uses a declarative configuration format with the following directives:

    # Comments start with #
    
    # Source directive - input plugins
    <source>
      @type [plugin_name]
      # plugin parameters
    </source>
    
    # Match directive - output plugins
    <match [tag_pattern]>
      @type [plugin_name]
      # plugin parameters
    </match>
    
    # Filter directive - filter plugins
    <filter [tag_pattern]>
      @type [plugin_name]
      # plugin parameters
    </filter>
    
    # System directive - global settings
    <system>
      # system parameters
    </system>
    ANSI

    Tag Patterns

    graph LR
        A["Tag: myapp.web.access"] --> B{Pattern Matching}
        B --> C["myapp.**"]
        B --> D["*.web.*"]
        B --> E["myapp.web.access"]
        B --> F["**"]
    
        style A fill:#e3f2fd
        style C fill:#e8f5e8
        style D fill:#e8f5e8
        style E fill:#e8f5e8
        style F fill:#e8f5e8

    Pattern Examples:

    • *: Matches any single tag part
    • **: Matches zero or more tag parts
    • {app1,app2}: Matches either app1 or app2
    • myapp.{web,api}.**: Matches myapp.web.* or myapp.api.*

    Common Parameters

    Time Format

    time_format %Y-%m-%d %H:%M:%S
    time_format_fallbacks %Y-%m-%d %H:%M:%S %z, %Y-%m-%d %H:%M:%S
    ANSI

    Include Files

    @include /etc/fluent/conf.d/*.conf
    ANSI

    Environment Variables

    host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
    port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
    ANSI

    Example: Complete Web Application Configuration

    # fluent.conf
    <system>
      log_level info
    </system>
    
    # Include additional configurations
    @include conf.d/*.conf
    
    # Web server access logs
    <source>
      @type tail
      path /var/log/nginx/access.log
      pos_file /var/log/fluent/nginx.access.log.pos
      tag nginx.access
      format nginx
    </source>
    
    # Application logs
    <source>
      @type tail
      path /var/log/myapp/*.log
      pos_file /var/log/fluent/myapp.log.pos
      tag myapp.logs
      format json
    </source>
    
    # Filter to add hostname
    <filter **>
      @type record_transformer
      <record>
        hostname ${hostname}
        service_name myapp
      </record>
    </filter>
    
    # Send to Elasticsearch
    <match nginx.access>
      @type elasticsearch
      host elasticsearch.example.com
      port 9200
      index_name nginx-access
      type_name _doc
    </match>
    
    <match myapp.logs>
      @type elasticsearch
      host elasticsearch.example.com
      port 9200
      index_name myapp-logs
      type_name _doc
    </match>
    ANSI

    5. Input Plugins

    Overview

    Input plugins define how Fluentd receives data. They run as separate threads and emit events to the Fluentd engine.

    graph TD
        A[Log Files] --> B[tail]
        C[HTTP Requests] --> D[http]
        E[Syslog] --> F[syslog]
        G[TCP/UDP] --> H[forward]
        I[Docker Containers] --> J[docker]
    
        B --> K[Fluentd Engine]
        D --> K
        F --> K
        H --> K
        J --> K
    
        style K fill:#f9f,stroke:#333,stroke-width:2px

    5.1 Tail Plugin

    Most commonly used for reading log files.

    <source>
      @type tail
      path /var/log/nginx/access.log
      pos_file /var/log/fluent/nginx.access.log.pos
      tag nginx.access
    
      <parse>
        @type nginx
        key_name message
      </parse>
    
      # Advanced options
      read_from_head true
      follow_inodes true
      refresh_interval 60
      rotate_wait 5
      enable_watch_timer true
      enable_stat_watcher true
      open_on_every_update false
      emit_unmatched_lines true
      ignore_repeated_permission_error true
    </source>
    ANSI

    Key Parameters:

    • path: File path (supports wildcards)
    • pos_file: Position file to track reading progress
    • tag: Tag assigned to events
    • read_from_head: Start reading from beginning of file
    • follow_inodes: Follow files even after rotation

    5.2 Forward Plugin

    Receives events from other Fluentd instances.

    <source>
      @type forward
      port 24224
      bind 0.0.0.0
    
      # Security options
      <security>
        self_hostname fluentd-server
        shared_key secret_string
      </security>
    
      # User authentication
      <user>
        username alice
        password secret_password
      </user>
    </source>
    ANSI

    5.3 HTTP Plugin

    Receives events via HTTP.

    <source>
      @type http
      port 9880
      bind 0.0.0.0
      body_size_limit 32m
      keepalive_timeout 10s
    
      # CORS support
      cors_allow_origins ["*"]
      cors_allow_methods ["POST"]
    
      <parse>
        @type json
        json_parser yajl
      </parse>
    
      # Custom response
      <response>
        @type json
      </response>
    </source>
    ANSI

    Usage:

    curl -X POST -d 'json={"event":"data"}' http://localhost:9880/app.log
    Bash

    5.4 Syslog Plugin

    Receives syslog messages.

    <source>
      @type syslog
      port 5140
      bind 0.0.0.0
      tag system.logs
      protocol_type udp
    
      <parse>
        @type syslog
        message_format rfc5424
        with_priority true
      </parse>
    </source>
    ANSI

    5.5 Docker Plugin

    Collects Docker container logs.

    <source>
      @type docker
      docker_url unix:///var/run/docker.sock
    
      # Container filtering
      <filter>
        container_name_regexp /^app-/
        exclude_container_name_regexp /test/
      </filter>
    
      <parse>
        @type json
      </parse>
    </source>
    ANSI

    5.6 Windows Event Log Plugin

    For Windows environments.

    <source>
      @type windows_eventlog
      channels application,system,security
      tag windows.eventlog
      read_interval 2
    
      <storage>
        @type local
        persistent true
        path C:\fluent\eventlog.json
      </storage>
    </source>
    ANSI

    5.7 Database Input

    Reading from databases.

    <source>
      @type sql
      host localhost
      port 3306
      database myapp
      username fluentd
      password secret
    
      tag_prefix mysql.myapp
      select_interval 60s
      select_limit 500
    
      <table>
        table users
        tag users
        update_column updated_at
        time_column created_at
      </table>
    </source>
    ANSI

    6. Output Plugins

    Overview

    Output plugins define where Fluentd sends processed data. They support buffering and retry mechanisms for reliability.

    graph TD
        A[Fluentd Engine] --> B[Elasticsearch]
        A --> C[AWS S3]
        A --> D[Kafka]
        A --> E[MongoDB]
        A --> F[File System]
        A --> G[HTTP Endpoint]
        A --> H[Email/Slack]
    
        style A fill:#f9f,stroke:#333,stroke-width:2px

    6.1 Elasticsearch Output

    Most popular output for log analytics.

    <match app.**>
      @type elasticsearch
      host elasticsearch.example.com
      port 9200
      scheme https
      user elastic
      password changeme
    
      # Index settings
      index_name app-logs
      type_name _doc
    
      # Dynamic index naming
      logstash_format true
      logstash_prefix app-logs
      logstash_dateformat %Y.%m.%d
    
      # Template management
      template_name app-template
      template_file /etc/fluent/elasticsearch-template.json
      template_overwrite true
    
      # Performance tuning
      bulk_message_request_threshold 1024
      bulk_message_flush_interval 60s
    
      # Retry and error handling
      retry_forever true
      retry_max_interval 30
      retry_randomize false
    
      # SSL configuration
      ssl_verify true
      ca_file /etc/ssl/certs/ca-certificates.crt
    
      # Buffer configuration
      <buffer>
        @type file
        path /var/log/fluent/elasticsearch
        flush_mode interval
        flush_interval 5s
        chunk_limit_size 2M
        queue_limit_length 8
        retry_type exponential_backoff
        retry_timeout 72h
      </buffer>
    </match>
    ANSI

    6.2 Amazon S3 Output

    For long-term log storage.

    <match app.**>
      @type s3
    
      # AWS credentials (use IAM roles in production)
      aws_key_id YOUR_AWS_KEY_ID
      aws_sec_key YOUR_AWS_SECRET_KEY
      s3_region us-west-2
      s3_bucket my-app-logs
    
      # Path and naming
      path logs/year=%Y/month=%m/day=%d/hour=%H/
      s3_object_key_format %{path}%{time_slice}_%{index}.%{file_extension}
    
      # File format
      <format>
        @type json
      </format>
    
      # Compression
      <compress>
        @type gzip
      </compress>
    
      # Time slicing
      time_slice_format %Y%m%d%H
      time_slice_wait 10m
    
      # Buffer settings
      <buffer time>
        @type file
        path /var/log/fluent/s3
        timekey 3600
        timekey_wait 600
        timekey_use_utc true
        chunk_limit_size 256m
      </buffer>
    </match>
    ANSI

    6.3 Apache Kafka Output

    For real-time data streaming.

    <match app.**>
      @type kafka2
    
      # Kafka cluster
      brokers kafka1:9092,kafka2:9092,kafka3:9092
      default_topic app-logs
    
      # Topic routing
      <topic_config>
        app.web.** web-logs
        app.api.** api-logs
        app.db.** db-logs
      </topic_config>
    
      # Partitioning
      default_partition_key_key hostname
      partition_key_key user_id
    
      # Message format
      <format>
        @type json
      </format>
    
      # Producer settings
      acks all
      compression_codec gzip
      max_send_retries 3
      required_acks 1
    
      # Buffer configuration
      <buffer topic>
        @type file
        path /var/log/fluent/kafka
        flush_mode immediate
        retry_type exponential_backoff
        retry_forever true
      </buffer>
    </match>
    ANSI

    6.4 MongoDB Output

    For document-based storage.

    <match app.**>
      @type mongo
    
      # Connection
      host localhost
      port 27017
      database app_logs
      collection logs
    
      # Authentication
      username fluentd
      password secret
    
      # Connection options
      ssl true
      ssl_cert /path/to/client.pem
      ssl_key /path/to/client.key
      ssl_ca_cert /path/to/ca.pem
    
      # Document structure
      capped true
      capped_size 1GB
      capped_max 1000000
    
      # Time handling
      time_key timestamp
      localtime true
    
      # Buffer settings
      <buffer>
        @type memory
        flush_mode interval
        flush_interval 10s
        chunk_limit_size 8m
        queue_limit_length 16
      </buffer>
    </match>
    ANSI

    6.5 File Output

    For local file storage.

    <match app.**>
      @type file
      path /var/log/fluent/app.%Y%m%d
      append true
    
      # File format
      <format>
        @type json
        time_key timestamp
        time_format %Y-%m-%d %H:%M:%S %z
      </format>
    
      # Compression
      compress gzip
    
      # Time slicing
      <buffer time>
        @type file
        path /var/log/fluent/buffer/app
        timekey 1h
        timekey_wait 5m
        timekey_use_utc false
      </buffer>
    </match>
    ANSI

    6.6 HTTP Output

    For webhook-style integrations.

    <match alert.**>
      @type http
      endpoint http://webhook.example.com/alerts
      http_method post
    
      # Headers
      headers {"Content-Type":"application/json"}
    
      # Authentication
      auth basic
      username webhook_user
      password webhook_pass
    
      # Format
      <format>
        @type json
      </format>
    
      # Retry configuration
      retries 3
      retry_wait 1s
      retry_max_interval 5s
    
      # SSL
      tls_verify_mode peer
      tls_ca_cert_path /etc/ssl/certs/ca-certificates.crt
    </match>
    ANSI

    6.7 Email/Slack Notifications

    # Email output
    <match critical.**>
      @type mail
      host smtp.gmail.com
      port 587
      user your_email@gmail.com
      password your_password
      enable_starttls_auto true
    
      from your_email@gmail.com
      to alerts@company.com
      subject Critical Alert: %s
      message %s
    
      out_keys tag,time,message
    </match>
    
    # Slack output
    <match alert.**>
      @type slack
      webhook_url https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK
      channel alerts
      username fluentd
      icon_emoji :warning:
    
      message "Alert: %s"
      message_keys message
    
      color danger
      title Alert Details
      title_keys tag,hostname
    </match>
    ANSI

    7. Filter and Parser Plugins

    Overview

    Filter plugins modify event streams, while parser plugins structure incoming data.

    graph LR
        A[Raw Event] --> B[Parser Plugin]
        B --> C[Structured Event]
        C --> D[Filter Plugin]
        D --> E[Enhanced Event]
        E --> F[Output Plugin]
    
        style B fill:#e8f5e8
        style D fill:#fff3e0

    7.1 Parser Plugins

    JSON Parser

    <source>
      @type tail
      path /var/log/app.log
      tag app.logs
    
      <parse>
        @type json
        json_parser yajl
        time_key timestamp
        time_format %Y-%m-%d %H:%M:%S %z
        keep_time_key true
      </parse>
    </source>
    ANSI

    Regexp Parser

    <source>
      @type tail
      path /var/log/nginx/access.log
      tag nginx.access
    
      <parse>
        @type regexp
        expression /^(?<remote>[^ ]*) (?<host>[^ ]*) (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^\"]*?)(?: +\S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")?$/
        time_format %d/%b/%Y:%H:%M:%S %z
      </parse>
    </source>
    ANSI

    Multi-format Parser

    <source>
      @type tail
      path /var/log/mixed.log
      tag mixed.logs
    
      <parse>
        @type multi_format
    
        <pattern>
          format json
        </pattern>
    
        <pattern>
          format regexp
          expression /^(?<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(?<level>\w+)\] (?<message>.*)$/
          time_format %Y-%m-%d %H:%M:%S
        </pattern>
    
        <pattern>
          format none
        </pattern>
      </parse>
    </source>
    ANSI

    CSV Parser

    <source>
      @type tail
      path /var/log/data.csv
      tag csv.data
    
      <parse>
        @type csv
        keys id,name,email,created_at
        time_key created_at
        time_format %Y-%m-%d %H:%M:%S
      </parse>
    </source>
    ANSI

    7.2 Filter Plugins

    Record Transformer

    Add, modify, or remove fields.

    <filter app.**>
      @type record_transformer
      enable_ruby true
    
      <record>
        hostname ${hostname}
        tag ${tag}
        timestamp ${time}
    
        # Conditional fields
        environment ${tag_parts[1] == "prod" ? "production" : "development"}
    
        # Computed fields
        response_time_ms ${record["response_time"].to_f * 1000}
    
        # String manipulation
        normalized_path ${record["path"].downcase.gsub(/\d+/, 'ID')}
    
        # Add metadata
        service_name myapp
        version 1.2.3
      </record>
    
      # Remove sensitive fields
      remove_keys password,secret_key
    
      # Rename fields
      rename_keys user:username,addr:ip_address
    </filter>
    ANSI

    Grep Filter

    Filter events based on conditions.

    # Include only specific levels
    <filter app.**>
      @type grep
      <regexp>
        key level
        pattern ^(ERROR|WARN|FATAL)$
      </regexp>
    </filter>
    
    # Exclude health check requests
    <filter nginx.access>
      @type grep
      <exclude>
        key path
        pattern ^/health$
      </exclude>
    </filter>
    
    # Multiple conditions (AND)
    <filter app.**>
      @type grep
      <regexp>
        key level
        pattern ERROR
      </regexp>
      <regexp>
        key service
        pattern web
      </regexp>
    </filter>
    
    # Multiple conditions (OR)
    <filter app.**>
      @type grep
      <or>
        <regexp>
          key level
          pattern ERROR
        </regexp>
        <regexp>
          key level
          pattern FATAL
        </regexp>
      </or>
    </filter>
    ANSI

    GeoIP Filter

    Add geographical information based on IP addresses.

    <filter nginx.access>
      @type geoip
    
      # GeoIP database
      geoip_database /etc/fluent/GeoLite2-City.mmdb
    
      # Field mapping
      <record>
        geoip_country ${city.country.iso_code}
        geoip_city ${city.city.names.en}
        geoip_latitude ${location.latitude}
        geoip_longitude ${location.longitude}
        geoip_region ${city.subdivisions.0.iso_code}
      </record>
    
      # IP field name
      geoip_lookup_keys remote_addr,x_forwarded_for
    
      # Skip private IPs
      skip_adding_null_record true
    </filter>
    ANSI

    Kubernetes Metadata Filter

    Enrich container logs with Kubernetes metadata.

    <filter kubernetes.**>
      @type kubernetes_metadata
    
      # Kubernetes API
      kubernetes_url https://kubernetes.default.svc.cluster.local
      bearer_token_file /var/run/secrets/kubernetes.io/serviceaccount/token
      ca_file /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
    
      # Metadata to include
      annotation_match [".*"]
      de_dot false
      de_dot_separator _
    
      # Cache settings
      cache_size 1000
      cache_ttl 3600
    </filter>
    ANSI

    Throttle Filter

    Rate limiting for events.

    <filter app.**>
      @type throttle
      group_key kubernetes.container_name
      group_bucket_period_s 60
      group_bucket_limit 100
    
      # What to do when throttled
      group_drop_logs true
      group_warning_delay_s 10
    </filter>
    ANSI

    Dedupe Filter

    Remove duplicate events.

    <filter app.**>
      @type dedupe
      key message,level
      cache_ttl 300
      cache_size 1000
    </filter>
    ANSI

    Concat Filter

    Combine multi-line logs.

    <filter app.**>
      @type concat
      key message
      separator ""
    
      # Java stack traces
      multiline_start_regexp /^\d{4}-\d{2}-\d{2}/
      multiline_end_regexp /^$/
      continuous_line_regexp /^\s/
    
      # Stream identity
      stream_identity_key container_id
    
      # Timeout for incomplete multiline
      flush_interval 5s
      timeout_label "@NORMAL"
    </filter>
    ANSI

    7.3 Advanced Filter Patterns

    Conditional Processing

    <filter app.**>
      @type record_transformer
      enable_ruby true
    
      <record>
        # Conditional field based on multiple conditions
        alert_level ${
          if record["status_code"].to_i >= 500
            "critical"
          elsif record["status_code"].to_i >= 400
            "warning"
          elsif record["response_time"].to_f > 1.0
            "performance"
          else
            "normal"
          end
        }
    
        # Dynamic routing tag
        routing_tag ${
          case record["service"]
          when "web"
            "app.web.processed"
          when "api"
            "app.api.processed"
          else
            "app.unknown.processed"
          end
        }
      </record>
    </filter>
    ANSI

    Data Enrichment Pipeline

    # Step 1: Parse user agent
    <filter web.access>
      @type parser
      key_name user_agent
      reserve_data true
      inject_key_prefix ua_
    
      <parse>
        @type user_agent
      </parse>
    </filter>
    
    # Step 2: Add GeoIP information
    <filter web.access>
      @type geoip
      geoip_lookup_keys remote_addr
      <record>
        country ${city.country.iso_code}
        city ${city.city.names.en}
      </record>
    </filter>
    
    # Step 3: Classify requests
    <filter web.access>
      @type record_transformer
      enable_ruby true
    
      <record>
        request_type ${
          if record["path"] =~ /^\/api\//
            "api"
          elsif record["path"] =~ /\.(css|js|png|jpg|gif)$/
            "static"
          else
            "page"
          end
        }
    
        is_bot ${record["ua_name"] =~ /(bot|crawler|spider)/i ? true : false}
      </record>
    </filter>
    ANSI

    8. Routing and Tagging

    Overview

    Fluentd’s routing system uses tags to determine how events flow through the pipeline.

    graph TD
        A[Input: myapp.web.access] --> B{Router}
        B --> |myapp.web.**| C[Web Processing]
        B --> |myapp.api.**| D[API Processing]
        B --> |myapp.**| E[General Processing]
        B --> |**| F[Catch All]
    
        C --> G[Elasticsearch: web-logs]
        D --> H[Kafka: api-events]
        E --> I[S3: app-logs]
        F --> J[File: unknown.log]
    
        style B fill:#f9f,stroke:#333,stroke-width:2px

    8.1 Tag Structure and Conventions

    Hierarchical Tagging

    application.component.environment.type
    └─ myapp.web.prod.access
    └─ myapp.api.staging.error
    └─ myapp.db.prod.slow_query
    └─ system.nginx.prod.access
    └─ security.auth.prod.failed_login
    ANSI

    Best Practices for Tag Naming

    # Good: Hierarchical and descriptive
    myapp.web.access
    myapp.api.error  
    system.nginx.access
    security.auth.failure
    
    # Avoid: Flat or unclear naming
    logs
    web_logs
    error_messages
    ANSI

    8.2 Routing Patterns

    Basic Routing

    # Exact match
    <match myapp.web.access>
      @type elasticsearch
      index_name web-access-logs
    </match>
    
    # Wildcard match
    <match myapp.web.*>
      @type file
      path /var/log/web/%Y%m%d
    </match>
    
    # Multi-level wildcard
    <match myapp.**>
      @type s3
      s3_bucket myapp-logs
    </match>
    
    # Multiple patterns
    <match {nginx,apache}.access>
      @type elasticsearch
      index_name web-servers
    </match>
    ANSI

    Complex Routing Examples

    # Route by environment
    <match *.prod.**>
      @type elasticsearch
      host prod-elasticsearch.example.com
      index_name prod-logs
    </match>
    
    <match *.staging.**>
      @type elasticsearch  
      host staging-elasticsearch.example.com
      index_name staging-logs
    </match>
    
    # Route by log level
    <match **.error>
      @type slack
      webhook_url https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK
      channel alerts
    </match>
    
    <match **.{warn,error,fatal}>
      @type elasticsearch
      index_name error-logs
    </match>
    
    # Route by service
    <match web.**>
      @type kafka
      default_topic web-events
    </match>
    
    <match api.**>
      @type kafka
      default_topic api-events
    </match>
    ANSI

    8.3 Tag Manipulation

    Re-tagging Events

    <match input.logs>
      @type rewrite_tag_filter
    
      <rule>
        key level
        pattern ERROR
        tag error.${tag}
      </rule>
    
      <rule>
        key level
        pattern WARN
        tag warning.${tag}
      </rule>
    
      <rule>
        key level
        pattern INFO
        tag info.${tag}
      </rule>
    
      # Default case
      <rule>
        key level
        pattern .*
        tag processed.${tag}
      </rule>
    </match>
    
    # Process re-tagged events
    <match error.**>
      @type slack
      webhook_url YOUR_WEBHOOK_URL
      channel alerts
    </match>
    
    <match {warning,info}.**>
      @type elasticsearch
      index_name application-logs
    </match>
    ANSI

    Dynamic Tagging

    <filter app.**>
      @type record_transformer
      enable_ruby true
    
      <record>
        # Add routing information to record
        routing_key ${
          case record["severity"]
          when "high"
            "alert"
          when "medium"  
            "warning"
          else
            "info"
          end
        }
      </record>
    </filter>
    
    <match app.**>
      @type rewrite_tag_filter
    
      <rule>
        key routing_key
        pattern alert
        tag alert.${tag}
      </rule>
    
      <rule>
        key routing_key
        pattern warning
        tag warning.${tag}
      </rule>
    
      <rule>
        key routing_key
        pattern .*
        tag processed.${tag}
      </rule>
    </match>
    ANSI

    8.4 Multi-destination Routing

    Copy Plugin

    Send events to multiple outputs.

    <match app.**>
      @type copy
    
      # Send to Elasticsearch for searching
      <store>
        @type elasticsearch
        host elasticsearch.example.com
        index_name app-logs
      </store>
    
      # Send to S3 for archival  
      <store>
        @type s3
        s3_bucket app-logs-archive
        path logs/%Y/%m/%d/
      </store>
    
      # Send critical events to Slack
      <store ignore_error>
        @type grep
        <regexp>
          key level
          pattern (ERROR|FATAL)
        </regexp>
    
        @type slack
        webhook_url YOUR_WEBHOOK_URL
        channel alerts
      </store>
    </match>
    ANSI

    Conditional Copy

    <match app.**>
      @type copy
    
      # Always send to main storage
      <store>
        @type elasticsearch
        index_name app-logs
      </store>
    
      # Only send errors to alerting
      <store>
        @type grep
        <regexp>
          key level
          pattern ERROR
        </regexp>
    
        @type slack
        webhook_url YOUR_WEBHOOK_URL
      </store>
    
      # Send to different S3 buckets based on environment
      <store>
        @type grep
        <regexp>
          key environment
          pattern production
        </regexp>
    
        @type s3
        s3_bucket prod-logs
      </store>
    
      <store>
        @type grep
        <regexp>
          key environment
          pattern staging
        </regexp>
    
        @type s3
        s3_bucket staging-logs
      </store>
    </match>
    ANSI

    8.5 Label Routing

    Advanced routing using labels for complex flows.

    <source>
      @type forward
      port 24224
      @label @mainstream
    </source>
    
    <source>
      @type tail
      path /var/log/secure
      tag security.auth
      @label @security
    </source>
    
    # Main processing pipeline
    <label @mainstream>
      <filter **>
        @type record_transformer
        <record>
          processed_by mainstream
        </record>
      </filter>
    
      <match app.**>
        @type elasticsearch
        index_name app-logs
      </match>
    
      <match system.**>
        @type file
        path /var/log/fluent/system.log
      </match>
    </label>
    
    # Security-specific processing
    <label @security>
      <filter **>
        @type grep
        <regexp>
          key message
          pattern authentication failure
        </regexp>
      </filter>
    
      <match **>
        @type slack
        webhook_url YOUR_WEBHOOK_URL
        channel security-alerts
      </match>
    </label>
    
    # Error handling label
    <label @ERROR>
      <match **>
        @type file
        path /var/log/fluent/error.log
      </match>
    </label>
    ANSI

    8.6 Routing Best Practices

    1. Consistent Tag Hierarchy

    # Use consistent naming conventions
    service.component.environment.type
    myapp.web.prod.access
    myapp.api.prod.error
    myapp.db.prod.query
    
    # Avoid mixing conventions
    myapp.web.access.prod  # Inconsistent
    web_prod_access        # Flat structure
    ANSI

    2. Performance Considerations

    # Order match directives from specific to general
    <match myapp.web.prod.error>        # Most specific
      @type slack
    </match>
    
    <match myapp.web.**>                 # More general
      @type elasticsearch
    </match>
    
    <match myapp.**>                     # General
      @type s3
    </match>
    
    <match **>                           # Catch-all (last)
      @type file
      path /var/log/fluent/unmatched.log
    </match>
    ANSI

    3. Testing and Debugging Routes

    # Add debug output to test routing
    <match **>
      @type copy
    
      # Your normal output
      <store>
        @type elasticsearch
        index_name app-logs
      </store>
    
      # Debug output (remove in production)
      <store>
        @type stdout
        output_type json
      </store>
    </match>
    ANSI

    9. Buffer Management

    Overview

    Buffering is crucial for Fluentd’s reliability and performance. It handles temporary storage, retry logic, and output delivery guarantees.

    graph TB
        A[Input Events] --> B[Buffer]
        B --> C{Flush Condition Met?}
        C -->|Yes| D[Output Plugin]
        C -->|No| B
        D --> E{Success?}
        E -->|Yes| F[Acknowledge]
        E -->|No| G[Retry Queue]
        G --> H{Max Retries?}
        H -->|No| D
        H -->|Yes| I[Dead Letter Queue]
    
        style B fill:#e3f2fd
        style G fill:#fff3e0
        style I fill:#ffebee

    9.1 Buffer Types

    Memory Buffer

    Fast but volatile – data is lost on restart.

    <match app.**>
      @type elasticsearch
    
      <buffer>
        @type memory
        chunk_limit_size 8MB
        queue_limit_length 16
        flush_mode interval
        flush_interval 5s
        overflow_action throw_exception
      </buffer>
    </match>
    ANSI

    File Buffer

    Persistent but slower – data survives restarts.

    <match app.**>
      @type elasticsearch
    
      <buffer>
        @type file
        path /var/log/fluent/buffer/elasticsearch
        chunk_limit_size 32MB
        queue_limit_length 64
        flush_mode interval
        flush_interval 10s
        overflow_action block
    
        # File-specific settings
        file_permission 0644
        dir_permission 0755
      </buffer>
    </match>
    ANSI

    9.2 Buffer Parameters

    Core Buffer Settings

    <buffer>
      # Buffer type
      @type file
      path /var/log/fluent/buffer/app
    
      # Chunk settings
      chunk_limit_size 32MB           # Max size per chunk
      chunk_limit_records 1000        # Max records per chunk
    
      # Queue settings  
      queue_limit_length 128          # Max chunks in queue
      queued_chunks_limit_size 1GB    # Max total queue size
    
      # Flush settings
      flush_mode interval             # interval, immediate, lazy
      flush_interval 10s              # How often to flush
      flush_at_shutdown true          # Flush on shutdown
    
      # Threading
      flush_thread_count 2            # Parallel flush threads
    
      # Overflow behavior
      overflow_action block           # block, throw_exception, drop_oldest_chunk
    
      # Retry settings
      retry_type exponential_backoff  # exponential_backoff, periodic
      retry_wait 1s                   # Initial retry wait
      retry_max_interval 60s          # Max retry interval
      retry_max_times 17              # Max retry attempts
      retry_forever false             # Retry indefinitely
      retry_timeout 72h               # Give up after this time
    
      # Secondary output on failure
      retry_secondary_threshold 0.8   # When to try secondary
    </buffer>
    ANSI

    Time-based Buffering

    <match app.**>
      @type s3
    
      <buffer time>
        @type file
        path /var/log/fluent/buffer/s3
    
        # Time-based chunking
        timekey 1h                    # 1 hour chunks
        timekey_wait 5m               # Wait 5min for late events
        timekey_use_utc true          # Use UTC timestamps
        timekey_zone +0900            # Timezone for timekey
    
        # Time boundaries
        flush_mode interval
        flush_interval 10m
      </buffer>
    </match>
    ANSI

    Tag-based Buffering

    <match app.**>
      @type elasticsearch
    
      <buffer tag>
        @type file
        path /var/log/fluent/buffer/elasticsearch
    
        # Each tag gets separate chunks
        chunk_limit_size 16MB
        flush_mode immediate
      </buffer>
    </match>
    ANSI

    Custom Key Buffering

    <match app.**>
      @type kafka
    
      <buffer hostname,level>
        @type memory
    
        # Buffer by hostname and log level
        chunk_limit_size 8MB
        flush_mode interval
        flush_interval 30s
      </buffer>
    </match>
    ANSI

    9.3 Buffer Monitoring

    Buffer Metrics

    Fluentd exposes buffer metrics for monitoring:

    <system>
      enable_input_metrics true
      enable_size_metrics true
    </system>
    
    # Monitor buffer metrics
    <source>
      @type monitor_agent
      bind 0.0.0.0
      port 24220
      include_config false
    </source>
    
    # Export metrics to Prometheus
    <source>
      @type prometheus
      bind 0.0.0.0
      port 24231
      metrics_path /metrics
    </source>
    
    <source>
      @type prometheus_output_monitor
      interval 10
      <labels>
        hostname ${hostname}
      </labels>
    </source>
    ANSI

    Key metrics to monitor:

    • buffer_queue_length: Number of chunks in queue
    • buffer_total_queued_size: Total size of queued data
    • retry_count: Number of retries
    • emit_count: Number of events emitted
    • emit_size: Size of emitted data

    9.4 Advanced Buffer Configurations

    High-throughput Configuration

    <match app.**>
      @type elasticsearch
    
      <buffer>
        @type file
        path /var/log/fluent/buffer/high-throughput
    
        # Large chunks for efficiency
        chunk_limit_size 64MB
        chunk_limit_records 100000
    
        # More parallel processing
        flush_thread_count 4
        queue_limit_length 256
    
        # Aggressive flushing
        flush_mode immediate
        flush_interval 1s
    
        # Handle overflow
        overflow_action drop_oldest_chunk
      </buffer>
    </match>
    ANSI

    Low-latency Configuration

    <match app.**>
      @type elasticsearch
    
      <buffer>
        @type memory
    
        # Small chunks for low latency
        chunk_limit_size 1MB
        chunk_limit_records 1000
    
        # Immediate flushing
        flush_mode immediate
    
        # Minimal queuing
        queue_limit_length 8
    
        # Fast retries
        retry_type exponential_backoff
        retry_wait 0.1s
        retry_max_interval 5s
      </buffer>
    </match>
    ANSI

    Reliable Configuration

    <match critical.**>
      @type elasticsearch
    
      <buffer>
        @type file
        path /var/log/fluent/buffer/critical
    
        # Persistent storage
        file_permission 0600
        dir_permission 0700
    
        # Conservative settings
        chunk_limit_size 8MB
        queue_limit_length 32
    
        # Reliable flushing
        flush_mode interval
        flush_interval 30s
        flush_at_shutdown true
    
        # Extensive retries
        retry_forever true
        retry_max_interval 300s
    
        # Block on overflow (don't lose data)
        overflow_action block
      </buffer>
    
      # Secondary output for failures
      <secondary>
        @type file
        path /var/log/fluent/failed/critical.%Y%m%d.log
    
        <format>
          @type json
        </format>
      </secondary>
    </match>
    ANSI

    9.5 Buffer Troubleshooting

    Common Buffer Issues

    1. Buffer Overflow
    # Symptoms: "buffer overflow" errors
    # Solutions:
    <buffer>
    # Increase buffer size
    queue_limit_length 256
    chunk_limit_size 64MB
    
    # Handle overflow gracefully
    overflow_action drop_oldest_chunk
    
    # Increase flush rate
    flush_interval 5s
    flush_thread_count 4
    </buffer>
    ANSI
    1. Slow Draining
    # Symptoms: Growing buffer queue
    # Solutions:
    <buffer>
    # More parallel processing
    flush_thread_count 8
    
    # Larger chunks (fewer requests)
    chunk_limit_size 32MB
    
    # Immediate mode for faster processing
    flush_mode immediate
    </buffer>
    ANSI
    1. Memory Issues
    # Symptoms: High memory usage
    # Solutions:
    <buffer>
    # Use file buffer instead of memory
    @type file
    path /var/log/fluent/buffer
    
    # Limit memory usage
    chunk_limit_size 8MB
    queue_limit_length 16
    
    # Flush more frequently
    flush_interval 10s
    </buffer>
    ANSI

    Buffer Debugging

    # Enable debug logging
    <system>
      log_level debug
    </system>
    
    # Monitor buffer status
    <source>
      @type monitor_agent
      bind 0.0.0.0
      port 24220
    </source>
    
    # Add buffer metrics to logs
    <filter **>
      @type record_transformer
      <record>
        buffer_queue_length "#{Fluent::Plugin.new_buffer('memory').queue.length}"
      </record>
    </filter>
    ANSI

    9.6 Buffer Best Practices

    1. Choose Appropriate Buffer Type

    # Use memory buffer for:
    # - High-performance scenarios
    # - Non-critical data
    # - Development environments
    
    # Use file buffer for:
    # - Production environments
    # - Critical data that cannot be lost
    # - When restart durability is required
    ANSI

    2. Size Buffers Appropriately

    # Consider your data volume and patterns
    <buffer>
      # For high-volume logs (>1GB/day)
      chunk_limit_size 32MB
      queue_limit_length 128
    
      # For low-volume logs (<100MB/day)
      chunk_limit_size 8MB
      queue_limit_length 32
    </buffer>
    ANSI

    3. Monitor Buffer Health

    # Set up alerts for:
    # - Buffer queue length > 80% of limit
    # - Retry count increasing
    # - Buffer overflow events
    # - Flush failures
    ANSI

    4. Plan for Failure Scenarios

    <match critical.**>
      @type elasticsearch
    
      <buffer>
        # Primary buffer configuration
        @type file
        path /var/log/fluent/buffer/critical
      </buffer>
    
      # Secondary output for failures
      <secondary>
        @type file
        path /var/log/fluent/failed/critical.log
      </secondary>
    </match>
    ANSI

    10. Performance Optimization

    Overview

    Optimizing Fluentd performance involves tuning multiple components: inputs, parsing, filtering, buffering, and outputs.

    graph TD
        A[Performance Bottlenecks] --> B[CPU Usage]
        A --> C[Memory Usage]
        A --> D[I/O Operations]
        A --> E[Network Latency]
    
        B --> F[Parser Optimization]
        B --> G[Filter Reduction]
        C --> H[Buffer Tuning]
        C --> I[Memory Management]
        D --> J[File Operations]
        D --> K[Disk Storage]
        E --> L[Batch Processing]
        E --> M[Connection Pooling]
    
        style A fill:#ffebee
        style B fill:#e8f5e8
        style C fill:#e3f2fd
        style D fill:#fff3e0
        style E fill:#f3e5f5

    10.1 System-level Optimization

    Operating System Tuning

    # Increase file descriptor limits
    echo "fluentd soft nofile 65536" >> /etc/security/limits.conf
    echo "fluentd hard nofile 65536" >> /etc/security/limits.conf
    
    # TCP buffer tuning
    echo 'net.core.rmem_max = 134217728' >> /etc/sysctl.conf
    echo 'net.core.wmem_max = 134217728' >> /etc/sysctl.conf
    echo 'net.ipv4.tcp_rmem = 4096 65536 134217728' >> /etc/sysctl.conf
    echo 'net.ipv4.tcp_wmem = 4096 65536 134217728' >> /etc/sysctl.conf
    
    # Virtual memory settings
    echo 'vm.max_map_count = 262144' >> /etc/sysctl.conf
    echo 'vm.swappiness = 1' >> /etc/sysctl.conf
    
    # Apply changes
    sysctl -p
    ANSI

    JVM Tuning (for JRuby)

    export JAVA_OPTS="-Xmx2g -Xms2g -XX:+UseG1GC -XX:MaxGCPauseMillis=200"
    ANSI

    Process and Thread Limits

    <system>
      # Worker processes
      workers 4
    
      # Root directory for workers
      root_dir /var/log/fluent
    
      # Log settings
      log_level info
      suppress_repeated_stacktrace true
    
      # Without sudo
      without_source true
    </system>
    ANSI

    10.2 Input Optimization

    Tail Plugin Optimization

    <source>
      @type tail
      path /var/log/app/*.log
      pos_file /var/log/fluent/app.log.pos
      tag app.logs
    
      # Performance tuning
      read_from_head false
      refresh_interval 60
      rotate_wait 5
      enable_watch_timer false          # Disable for high-volume logs
      enable_stat_watcher true          # Use for better inode tracking
      open_on_every_update false        # Keep files open
    
      # Reduce parsing overhead
      <parse>
        @type none                      # Skip parsing if not needed
      </parse>
    
      # Multi-line handling
      read_lines_limit 1000            # Process more lines per iteration
      read_bytes_limit_per_second -1   # No rate limiting
    </source>
    ANSI

    Forward Plugin Optimization

    <source>
      @type forward
      port 24224
      bind 0.0.0.0
    
      # Connection settings
      linger_timeout 0
      chunk_size_limit 1MB
      chunk_size_warn_limit 1MB
    
      # Skip hostname resolution
      resolve_hostname false
    
      # Disable access logging for performance
      deny_keepalive false
    </source>
    ANSI

    HTTP Plugin Optimization

    <source>
      @type http
      port 9880
      bind 0.0.0.0
    
      # Performance settings
      body_size_limit 32m
      keepalive_timeout 10s
    
      # Disable CORS if not needed
      cors_allow_origins []
    
      # Use efficient parser
      <parse>
        @type json
        json_parser oj                  # Faster JSON parser
      </parse>
    </source>
    ANSI

    10.3 Parser Optimization

    Choose Efficient Parsers

    # Fast: Pre-structured formats
    <parse>
      @type json
      json_parser oj                    # Fastest JSON parser
    </parse>
    
    # Medium: Simple regex
    <parse>
      @type regexp
      expression /^(?<time>\S+) (?<level>\S+) (?<message>.*)$/
      time_format %Y-%m-%dT%H:%M:%S%z
    </parse>
    
    # Slow: Complex regex or multi-format
    <parse>
      @type multi_format                # Avoid if possible
      <pattern>
        format json
      </pattern>
      <pattern>
        format apache2
      </pattern>
    </parse>
    ANSI

    Optimize Regular Expressions

    # Good: Specific and anchored
    expression /^(?<timestamp>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z) \[(?<level>\w+)\] (?<message>.+)$/
    
    # Bad: Greedy and unanchored
    expression /.*(\d{4}-\d{2}-\d{2}.*) \[(.*)\] (.*)$/
    
    # Better: Non-greedy when needed
    expression /^(?<timestamp>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z) \[(?<level>\w+?)\] (?<message>.+)$/
    ANSI

    Skip Unnecessary Parsing

    # Parse only when needed
    <source>
      @type tail
      path /var/log/app.log
      tag app.raw
    
      <parse>
        @type none                      # Don't parse at input
      </parse>
    </source>
    
    # Parse later, only for specific outputs
    <filter app.raw>
      @type parser
      key_name message
      reserve_data true
    
      <parse>
        @type json
      </parse>
    </filter>
    ANSI

    10.4 Filter Optimization

    Minimize Filter Usage

    # Bad: Multiple separate filters
    <filter app.**>
      @type record_transformer
      <record>
        hostname ${hostname}
      </record>
    </filter>
    
    <filter app.**>
      @type record_transformer
      <record>
        timestamp ${time}
      </record>
    </filter>
    
    # Good: Single combined filter
    <filter app.**>
      @type record_transformer
      <record>
        hostname ${hostname}
        timestamp ${time}
      </record>
    </filter>
    ANSI

    Optimize Record Transformer

    <filter app.**>
      @type record_transformer
      enable_ruby true
    
      # Cache expensive operations
      <record>
        # Good: Simple operations
        hostname ${hostname}
        tag ${tag}
    
        # Avoid: Complex computations in each event
        # slow_operation ${record["data"].to_json.length}
    
        # Better: Pre-compute when possible
        service_name myapp
        version 1.0.0
      </record>
    
      # Remove unnecessary fields early
      remove_keys password,secret,internal_field
    </filter>
    ANSI

    Use Grep for Early Filtering

    # Filter out unnecessary events early
    <filter app.**>
      @type grep
      <exclude>
        key path
        pattern ^/health$
      </exclude>
    </filter>
    
    # This reduces load on downstream filters and outputs
    <filter app.**>
      @type record_transformer
      # ... expensive operations
    </filter>
    ANSI

    10.5 Buffer Optimization

    High-throughput Buffer Configuration

    <match app.**>
      @type elasticsearch
    
      <buffer>
        @type file
        path /var/log/fluent/buffer/high-throughput
    
        # Large chunks for fewer requests
        chunk_limit_size 64MB
        chunk_limit_records 100000
    
        # Parallel processing
        flush_thread_count 8
    
        # Large queue
        queue_limit_length 512
        queued_chunks_limit_size 2GB
    
        # Immediate flushing
        flush_mode immediate
    
        # Handle overflow
        overflow_action drop_oldest_chunk
    
        # Optimized retries
        retry_type exponential_backoff
        retry_wait 1s
        retry_max_interval 30s
        retry_max_times 5
      </buffer>
    </match>
    ANSI

    Memory vs File Buffer Trade-offs

    # High-performance, low-durability
    <buffer>
      @type memory
      chunk_limit_size 16MB
      queue_limit_length 64
      flush_mode immediate
    </buffer>
    
    # Balanced performance and durability
    <buffer>
      @type file
      path /var/log/fluent/buffer
      chunk_limit_size 32MB
      queue_limit_length 128
      flush_mode interval
      flush_interval 5s
    </buffer>
    
    # High-durability, lower performance
    <buffer>
      @type file
      path /var/log/fluent/buffer
      chunk_limit_size 8MB
      queue_limit_length 32
      flush_mode interval
      flush_interval 30s
      flush_at_shutdown true
    </buffer>
    ANSI

    10.6 Output Optimization

    Elasticsearch Optimization

    <match app.**>
      @type elasticsearch
      host elasticsearch.example.com
      port 9200
    
      # Bulk settings for performance
      bulk_message_request_threshold 5000
      bulk_message_flush_interval 5s
    
      # Connection optimization
      reload_connections false
      reload_after 100
    
      # Disable expensive operations
      include_tag_key false
      include_timestamp false
    
      # Template optimization
      template_overwrite false
    
      # Use efficient formatting
      <format>
        @type json
      </format>
    
      <buffer>
        @type file
        path /var/log/fluent/buffer/elasticsearch
        chunk_limit_size 32MB
        flush_thread_count 4
        flush_mode immediate
      </buffer>
    </match>
    ANSI

    Kafka Optimization

    <match app.**>
      @type kafka2
    
      # Broker settings
      brokers kafka1:9092,kafka2:9092,kafka3:9092
      default_topic app-logs
    
      # Producer optimization
      acks 1                            # Don't wait for all replicas
      compression_codec snappy          # Fast compression
      batch_size 16384                  # Larger batches
      linger_ms 5                       # Small delay for batching
    
      # Partitioning
      partition_key_key hostname        # Distribute load
    
      <buffer>
        @type memory
        chunk_limit_size 16MB
        flush_mode immediate
        flush_thread_count 4
      </buffer>
    </match>
    ANSI

    S3 Optimization

    <match app.**>
      @type s3
    
      aws_key_id YOUR_KEY
      aws_sec_key YOUR_SECRET
      s3_region us-west-2
      s3_bucket app-logs
    
      # Large objects for efficiency
      s3_object_key_format %{path}%{time_slice}_%{index}.%{file_extension}
    
      # Compression
      <compress>
        @type gzip
      </compress>
    
      # Storage class
      storage_class STANDARD_IA         # Cheaper for infrequent access
    
      <buffer time>
        @type file
        path /var/log/fluent/buffer/s3
        timekey 3600                    # 1 hour chunks
        timekey_wait 300                # 5 minute wait
        chunk_limit_size 256MB          # Large chunks
        flush_thread_count 2
      </buffer>
    </match>
    ANSI

    // …existing code…

    10.7 Monitoring Performance

    Performance Metrics

    # Enable metrics collection
    <system>
      enable_input_metrics true
      enable_size_metrics true
    </system>
    
    # Monitor agent
    <source>
      @type monitor_agent
      bind 0.0.0.0
      port 24220
      include_config false
    </source>
    
    # Prometheus metrics
    <source>
      @type prometheus
      bind 0.0.0.0
      port 24231
      metrics_path /metrics
    </source>
    
    <source>
      @type prometheus_output_monitor
      interval 10
      <labels>
        hostname ${hostname}
        service fluentd
      </labels>
    </source>
    ANSI

    Key Performance Indicators

    graph TB
        A[Performance Metrics] --> B[Throughput]
        A --> C[Latency] 
        A --> D[Resource Usage]
        A --> E[Error Rates]
    
        B --> F[Events/sec]
        B --> G[Bytes/sec]
        C --> H[Processing Time]
        C --> I[Buffer Wait Time]
        D --> J[CPU Usage]
        D --> K[Memory Usage]
        D --> L[Disk I/O]
        E --> M[Parse Errors]
        E --> N[Output Failures]
    
        style A fill:#e3f2fd
        style B fill:#e8f5e8
        style C fill:#fff3e0
        style D fill:#f3e5f5
        style E fill:#ffebee

    10.8 Performance Best Practices

    1. Profile Your Configuration

    # Use Fluentd's profiling tools
    fluentd -c fluent.conf --dry-run
    fluentd -c fluent.conf -vv --log-level debug
    
    # Monitor system resources
    top -p $(pgrep fluentd)
    iotop -p $(pgrep fluentd)
    Bash

    2. Optimize Data Flow

    # Bad: Inefficient pipeline
    <source>
      @type tail
      path /var/log/app.log
      tag app.logs
      <parse>
        @type json
      </parse>
    </source>
    
    <filter app.logs>
      @type grep
      <regexp>
        key level
        pattern ERROR
      </regexp>
    </filter>
    
    <filter app.logs>
      @type record_transformer
      <record>
        hostname ${hostname}
      </record>
    </filter>
    
    # Good: Optimized pipeline
    <source>
      @type tail
      path /var/log/app.log
      tag app.logs
      <parse>
        @type json
      </parse>
    </source>
    
    <filter app.logs>
      @type grep
      <regexp>
        key level
        pattern ERROR
      </regexp>
    </filter>
    
    <filter app.logs>
      @type record_transformer
      <record>
        hostname ${hostname}
      </record>
    </filter>
    ANSI

    11. Monitoring and Troubleshooting

    Overview

    Effective monitoring and troubleshooting are essential for maintaining a healthy Fluentd deployment.

    graph TD
        A[Monitoring Strategy] --> B[System Metrics]
        A --> C[Application Metrics]
        A --> D[Log Analysis]
        A --> E[Alerting]
    
        B --> F[CPU/Memory/Disk]
        B --> G[Network I/O]
        C --> H[Buffer Status]
        C --> I[Event Throughput]
        C --> J[Error Rates]
        D --> K[Fluentd Logs]
        D --> L[Output Validation]
        E --> M[Threshold Alerts]
        E --> N[Anomaly Detection]
    
        style A fill:#e3f2fd
        style B fill:#e8f5e8
        style C fill:#fff3e0
        style D fill:#f3e5f5
        style E fill:#ffebee

    11.1 Built-in Monitoring

    Monitor Agent Plugin

    <source>
      @type monitor_agent
      bind 0.0.0.0
      port 24220
      include_config false
      include_retry false
    </source>
    ANSI

    Query monitor agent:

    # Get plugin information
    curl http://localhost:24220/api/plugins.json
    
    # Get specific plugin info
    curl http://localhost:24220/api/plugins/object:3fd8d4d4d638.json
    
    # Get config dump
    curl http://localhost:24220/api/config.json
    Bash

    Debug Event Dump

    <source>
      @type debug_agent
      bind 0.0.0.0
      port 24230
    </source>
    ANSI

    11.2 Metrics Collection

    Prometheus Integration

    # Install prometheus plugin
    gem install fluent-plugin-prometheus
    
    # Configuration
    <source>
      @type prometheus
      bind 0.0.0.0
      port 24231
      metrics_path /metrics
    </source>
    
    <source>
      @type prometheus_monitor
      interval 10
      <labels>
        hostname ${hostname}
        service fluentd
      </labels>
    </source>
    
    <source>
      @type prometheus_output_monitor
      interval 10
      <labels>
        hostname ${hostname}
      </labels>
    </source>
    
    <source>
      @type prometheus_tail_monitor
      interval 10
      <labels>
        hostname ${hostname}
      </labels>
    </source>
    ANSI

    Custom Metrics

    <filter app.**>
      @type prometheus
      <metric>
        name fluentd_input_status_num_records_total
        type counter
        desc The total number of incoming records
        <labels>
          tag ${tag}
          hostname ${hostname}
        </labels>
      </metric>
    
      <metric>
        name fluentd_input_status_num_bytes_total
        type counter
        desc The total bytes of incoming records
        key size
        <labels>
          tag ${tag}
          hostname ${hostname}
        </labels>
      </metric>
    </filter>
    ANSI

    11.3 Health Checks

    HTTP Health Check Endpoint

    <source>
      @type http
      port 9880
      bind 0.0.0.0
    
      <transport tls>
        cert_path /path/to/cert.pem
        private_key_path /path/to/key.pem
      </transport>
    </source>
    
    # Add health check route
    <match fluentd.health>
      @type null
    </match>
    ANSI

    Health check script:

    #!/bin/bash
    # health_check.sh
    
    # Check if Fluentd is running
    if ! pgrep -f fluentd > /dev/null; then
        echo "CRITICAL: Fluentd process not running"
        exit 2
    fi
    
    # Check HTTP endpoint
    if ! curl -f http://localhost:24220/api/plugins.json > /dev/null 2>&1; then
        echo "CRITICAL: Monitor agent not responding"
        exit 2
    fi
    
    # Check buffer sizes
    BUFFER_SIZE=$(curl -s http://localhost:24220/api/plugins.json | jq '.plugins[] | select(.type=="output") | .buffer_queue_length' | awk '{sum+=$1} END {print sum}')
    
    if [ "$BUFFER_SIZE" -gt 1000 ]; then
        echo "WARNING: High buffer queue length: $BUFFER_SIZE"
        exit 1
    fi
    
    echo "OK: Fluentd is healthy"
    exit 0
    Bash

    11.4 Log Analysis

    Fluentd Internal Logging

    <system>
      log_level info
    
      # Separate log files
      <log>
        format json
        time_format %Y-%m-%d %H:%M:%S %z
      </log>
    
      # Suppress repeated stack traces
      suppress_repeated_stacktrace true
      emit_error_log_interval 30s
      suppress_config_dump true
    
      # Log rotation
      <log>
        rotate_age 7
        rotate_size 10MB
      </log>
    </system>
    ANSI

    Important Log Patterns

    # Monitor for these patterns in Fluentd logs:
    
    # Buffer overflow
    grep "buffer overflow" /var/log/fluent/fluentd.log
    
    # Parse failures  
    grep "parse failed" /var/log/fluent/fluentd.log
    
    # Output failures
    grep "failed to flush" /var/log/fluent/fluentd.log
    
    # Connection issues
    grep "connection refused\|timeout" /var/log/fluent/fluentd.log
    
    # Memory issues
    grep "out of memory\|memory exhausted" /var/log/fluent/fluentd.log
    Bash

    11.5 Common Issues and Solutions

    Issue 1: High Memory Usage

    graph TD
        A[High Memory Usage] --> B{Check Buffer Type}
        B -->|Memory Buffer| C[Switch to File Buffer]
        B -->|File Buffer| D[Check Buffer Size]
        D --> E[Reduce chunk_limit_size]
        D --> F[Reduce queue_limit_length]
        C --> G[Monitor Memory]
        E --> G
        F --> G
    
        style A fill:#ffebee
        style G fill:#e8f5e8

    Symptoms:

    • Fluentd process consuming excessive RAM
    • System becoming unresponsive
    • Out of memory errors in logs

    Diagnosis:

    # Check memory usage
    ps aux | grep fluentd
    top -p $(pgrep fluentd)
    
    # Check buffer statistics
    curl http://localhost:24220/api/plugins.json | jq '.plugins[] | select(.buffer_queue_length > 0)'
    Bash

    Solution:

    # Replace memory buffer with file buffer
    <match app.**>
      @type elasticsearch
    
      <buffer>
        @type file
        path /var/log/fluent/buffer
        chunk_limit_size 8MB          # Reduce from 32MB
        queue_limit_length 32         # Reduce from 128
        flush_mode interval
        flush_interval 10s
      </buffer>
    </match>
    ANSI

    Issue 2: Buffer Overflow

    Symptoms:

    • “buffer space has too many data” errors
    • “buffer overflow” in logs
    • Events being dropped

    Root Causes:

    • Downstream system too slow
    • Network connectivity issues
    • Insufficient buffer capacity

    Diagnosis:

    # Check buffer queue status
    curl -s http://localhost:24220/api/plugins.json | \
    jq '.plugins[] | select(.type=="output") | {plugin_id, buffer_queue_length, buffer_total_queued_size}'
    
    # Monitor buffer growth
    watch -n 5 'curl -s http://localhost:24220/api/plugins.json | jq ".plugins[] | select(.type==\"output\") | .buffer_queue_length"'
    Bash

    Solutions:

    # Immediate fixes
    <match app.**>
      @type elasticsearch
    
      <buffer>
        # Increase buffer capacity
        queue_limit_length 256        # Increase queue size
        chunk_limit_size 32MB         # Larger chunks
    
        # Faster flushing
        flush_mode immediate
        flush_thread_count 4          # More parallel threads
    
        # Handle overflow gracefully
        overflow_action drop_oldest_chunk
      </buffer>
    </match>
    
    # Long-term solutions
    <match app.**>
      @type copy
    
      # Primary output with circuit breaker
      <store>
        @type elasticsearch
        host primary-es.example.com
    
        <buffer>
          retry_max_times 3
          retry_timeout 30s
        </buffer>
    
        <secondary>
          @type relabel
          @label @FALLBACK
        </secondary>
      </store>
    </match>
    
    <label @FALLBACK>
      <match **>
        @type s3
        s3_bucket backup-logs
        path fallback/
      </match>
    </label>
    ANSI

    Issue 3: Parse Failures

    Symptoms:

    • “failed to parse” errors
    • Missing fields in output
    • Events with unexpected format

    Common Scenarios:

    1. JSON Parse Errors:
    # Problem: Malformed JSON
    # Input: {"message": "Hello world", "level": INFO}  # Missing quotes
    
    # Solution: Use multiformat parser
    
    <source>
      @type tail
      path /var/log/app.log
      tag app.logs
    
    <parse>
        @type multi_format
    
        <pattern>
          format json
        </pattern>
        
        <pattern>
          format regexp
          expression /^(?<message>.*)$/
        </pattern>
    
    </parse>
    </source>
    ANSI
    1. Regex Parse Errors:
    # Problem: Regex doesn't match log format
    # Solution: Test and fix regex
    
    
    # Debug regex pattern
    
    <source>
      @type tail
      path /var/log/nginx/access.log
      tag nginx.access
    
    <parse>
        @type regexp
        # Test with online regex tools first
        expression /^(?<remote>[^ ]*) (?<host>[^ ]*) (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^\"]*?)(?: +\S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*)/
        time_format %d/%b/%Y:%H:%M:%S %z
      </parse>
    </source>
    ANSI
    1. Handle Parse Errors:
    <source>
    @type tail
    path /var/log/app.log
    tag app.logs
    
    <parse>
    @type json
    # Send unparseable lines to error stream
    emit_invalid_record_to_error true
    </parse>
    </source>
    
    
    # Handle parse errors
    
    <label @ERROR>
      <match **>
        @type file
        path /var/log/fluent/parse_errors.log
    
        <format>
          @type json
        </format>
    
    </match>
    </label>
    ANSI

    Issue 4: Output Failures

    Symptoms:

    • “failed to flush” errors
    • Events not reaching destination
    • Retries exhausted

    Elasticsearch Connection Issues:

    # Problem: Connection refused, timeout
    # Diagnosis commands:
    # curl -X GET "elasticsearch.example.com:9200/_cluster/health"
    # telnet elasticsearch.example.com 9200
    
    <match app.**>
      @type elasticsearch
      host elasticsearch.example.com
      port 9200
    
      # Connection tuning
      request_timeout 60s
      reload_connections true
      reload_after 500
    
      # Retry configuration
      <buffer>
        retry_type exponential_backoff
        retry_wait 1s
        retry_max_interval 60s
        retry_max_times 10
        retry_forever false
        retry_timeout 24h
      </buffer>
    
      # Secondary output for failures
      <secondary>
        @type file
        path /var/log/fluent/failed_elasticsearch.log
    
        <format>
          @type json
        </format>
      </secondary>
    </match>
    ANSI

    Issue 5: Performance Degradation

    Symptoms:

    • High CPU usage
    • Slow log processing
    • Growing buffer queues

    Performance Tuning Checklist:

    1. Input Optimization:
    <source>
    @type tail
    path /var/log/app/*.log
    
    # Optimize for performance
    read_from_head false
    refresh_interval 60
    rotate_wait 5
    enable_watch_timer false      # Disable for high-volume
    enable_stat_watcher true      # Better inode tracking
    open_on_every_update false    # Keep files open
    read_lines_limit 1000         # Process more lines per batch
    </source>
    ANSI
    1. Filter Optimization:
    # Bad: Multiple separate filters
    <filter app.**>
    @type record_transformer
    <record>
    hostname ${hostname}
    </record>
    </filter>
    
    <filter app.**>
      @type record_transformer
      <record>
        timestamp ${time}
      </record>
    </filter>
    
    # Good: Combined filter
    
    <filter app.**>
      @type record_transformer
      <record>
        hostname ${hostname}
        timestamp ${time}
      </record>
    </filter>
    ANSI

    3. Output Optimization:

    <match app.**>
      @type elasticsearch
    
      # Bulk settings
      bulk_message_request_threshold 5000
      bulk_message_flush_interval 5s
    
      # Buffer tuning
      <buffer>
        @type file
        chunk_limit_size 32MB       # Larger chunks
        flush_thread_count 4        # Parallel processing
        flush_mode immediate        # Faster processing
      </buffer>
    </match>
    ANSI

    Issue 6: SSL/TLS Certificate Problems

    Symptoms:

    • “certificate verify failed” errors
    • SSL handshake failures
    • Connection refused with SSL

    Solutions:

    # Skip SSL verification (not recommended for production)
    <match secure.**>
      @type elasticsearch
      scheme https
      ssl_verify false
    </match>
    
    # Proper SSL configuration
    <match secure.**>
      @type elasticsearch
      scheme https
      ssl_verify true
      ca_file /etc/ssl/certs/ca-certificates.crt
      client_cert /etc/fluent/certs/client.crt
      client_key /etc/fluent/certs/client.key
    </match>
    
    # Debug SSL issues
    <match secure.**>
      @type elasticsearch
      scheme https
      ssl_verify true
      ssl_version TLSv1_2
      log_es_400_reason true
    </match>
    ANSI

    11.6 Troubleshooting Workflow

    Step-by-Step Debugging Process

    1. Identify the Problem:
    # Check Fluentd status
    systemctl status fluentd
    
    
    # Check logs
    
    tail -f /var/log/fluent/fluentd.log
    
    # Check configuration
    
    fluentd -c /etc/fluent/fluent.conf --dry-run
    ANSI

    2. Gather Information:

    # System resources
    top -p $(pgrep fluentd)
    df -h /var/log/fluent
    
    # Network connectivity
    nc -zv elasticsearch.example.com 9200
    ping elasticsearch.example.com
    
    # Plugin status
    curl http://localhost:24220/api/plugins.json | jq '.'
    ANSI
    1. Isolate the Issue:
    # Minimal test configuration
    <source>
    @type forward
    port 24224
    </source>
    
    <match **>
      @type stdout
    </match>
    ANSI

    4. Enable Debug Logging:

    <system>
      log_level debug
      suppress_repeated_stacktrace false
    </system>
    ANSI
    1. Test Components Individually:
    # Test input
    echo '{"test":"message"}' | fluent-cat test.debug
    
    
    # Test parsing
    
    echo 'test log line' > /tmp/test.log
    
    # Configure tail input to read from /tmp/test.log
    
    # Test output
    
    # Use stdout output to verify data flow
    ANSI

    11.7 Performance Monitoring and Alerting

    Key Metrics Dashboard

    # Prometheus metrics configuration
    <source>
      @type prometheus
      bind 0.0.0.0
      port 24231
      metrics_path /metrics
    </source>
    
    <source>
      @type prometheus_monitor
      interval 10
      <labels>
        hostname ${hostname}
        service fluentd
      </labels>
    </source>
    
    <source>
      @type prometheus_output_monitor
      interval 10
      <labels>
        hostname ${hostname}
      </labels>
    </source>
    ANSI

    Critical Alerts Configuration

    # prometheus-alerts.yml
    groups:
    - name: fluentd.rules
      rules:
      - alert: FluentdDown
        expr: up{job="fluentd"} == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Fluentd instance {{ $labels.instance }} is down"
          description: "Fluentd has been down for more than 1 minute"
    
      - alert: FluentdHighBufferUsage
        expr: fluentd_status_buffer_queue_length > 1000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High buffer usage on {{ $labels.instance }}"
          description: "Buffer queue length is {{ $value }}"
    
      - alert: FluentdHighErrorRate
        expr: rate(fluentd_status_retry_count[5m]) > 10
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "High error rate on {{ $labels.instance }}"
          description: "Error rate is {{ $value }} errors/second"
    
      - alert: FluentdNoDataFlow
        expr: rate(fluentd_input_status_num_records_total[5m]) == 0
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "No data flowing through {{ $labels.instance }}"
          description: "No records processed in the last 10 minutes"
    YAML

    Health Check Script

    #!/bin/bash
    # comprehensive_health_check.sh
    
    set -e
    
    FLUENTD_HOST="localhost"
    MONITOR_PORT="24220"
    HEALTH_URL="http://${FLUENTD_HOST}:${MONITOR_PORT}/api/plugins.json"
    LOG_FILE="/var/log/fluent/fluentd.log"
    BUFFER_DIR="/var/log/fluent/buffer"
    
    # Colors for output
    RED='\033[0;31m'
    YELLOW='\033[1;33m'
    GREEN='\033[0;32m'
    NC='\033[0m' # No Color
    
    # Function to print colored output
    print_status() {
        local status=$1
        local message=$2
        case $status in
            "OK")
                echo -e "${GREEN}✓ OK${NC}: $message"
                ;;
            "WARNING")
                echo -e "${YELLOW}⚠ WARNING${NC}: $message"
                ;;
            "CRITICAL")
                echo -e "${RED}✗ CRITICAL${NC}: $message"
                ;;
        esac
    }
    
    # Check if Fluentd process is running
    check_process() {
        if pgrep -f fluentd > /dev/null; then
            print_status "OK" "Fluentd process is running"
            return 0
        else
            print_status "CRITICAL" "Fluentd process not found"
            return 2
        fi
    }
    
    # Check HTTP API endpoint
    check_api() {
        if curl -f -s "$HEALTH_URL" > /dev/null; then
            print_status "OK" "Monitor agent responding"
            return 0
        else
            print_status "CRITICAL" "Monitor agent not responding"
            return 2
        fi
    }
    
    # Check buffer health
    check_buffers() {
        local buffer_data
        buffer_data=$(curl -s "$HEALTH_URL" 2>/dev/null)
    
        if [ $? -ne 0 ]; then
            print_status "CRITICAL" "Cannot retrieve buffer information"
            return 2
        fi
    
        local total_queue_length
        total_queue_length=$(echo "$buffer_data" | jq '.plugins[] | select(.buffer_queue_length != null) | .buffer_queue_length' | awk '{sum+=$1} END {print sum+0}')
    
        if [ "$total_queue_length" -gt 1000 ]; then
            print_status "CRITICAL" "High buffer queue length: $total_queue_length"
            return 2
        elif [ "$total_queue_length" -gt 500 ]; then
            print_status "WARNING" "Moderate buffer queue length: $total_queue_length"
            return 1
        else
            print_status "OK" "Buffer queue length: $total_queue_length"
            return 0
        fi
    }
    
    # Check recent errors
    check_errors() {
        if [ ! -f "$LOG_FILE" ]; then
            print_status "WARNING" "Log file not found: $LOG_FILE"
            return 1
        fi
    
        local error_count
        error_count=$(tail -n 1000 "$LOG_FILE" | grep -c "\[error\]" || echo 0)
    
        if [ "$error_count" -gt 10 ]; then
            print_status "CRITICAL" "High error count in recent logs: $error_count"
            return 2
        elif [ "$error_count" -gt 5 ]; then
            print_status "WARNING" "Moderate error count in recent logs: $error_count"
            return 1
        else
            print_status "OK" "Error count in recent logs: $error_count"
            return 0
        fi
    }
    
    # Check disk space
    check_disk_space() {
        if [ ! -d "$BUFFER_DIR" ]; then
            print_status "WARNING" "Buffer directory not found: $BUFFER_DIR"
            return 1
        fi
    
        local usage
        usage=$(df "$BUFFER_DIR" | awk 'NR==2 {print $5}' | sed 's/%//')
    
        if [ "$usage" -gt 90 ]; then
            print_status "CRITICAL" "High disk usage: ${usage}%"
            return 2
        elif [ "$usage" -gt 80 ]; then
            print_status "WARNING" "Moderate disk usage: ${usage}%"
            return 1
        else
            print_status "OK" "Disk usage: ${usage}%"
            return 0
        fi
    }
    
    # Check memory usage
    check_memory() {
        local memory_usage
        memory_usage=$(ps -p $(pgrep fluentd) -o %mem --no-headers | awk '{print int($1)}')
    
        if [ "$memory_usage" -gt 80 ]; then
            print_status "CRITICAL" "High memory usage: ${memory_usage}%"
            return 2
        elif [ "$memory_usage" -gt 60 ]; then
            print_status "WARNING" "Moderate memory usage: ${memory_usage}%"
            return 1
        else
            print_status "OK" "Memory usage: ${memory_usage}%"
            return 0
        fi
    }
    
    # Main execution
    main() {
        echo "=== Fluentd Health Check ==="
        echo "Timestamp: $(date)"
        echo "Host: $(hostname)"
        echo ""
    
        local exit_code=0
    
        check_process || exit_code=$?
        check_api || exit_code=$?
        check_buffers || exit_code=$?
        check_errors || exit_code=$?
        check_disk_space || exit_code=$?
        check_memory || exit_code=$?
    
        echo ""
        case $exit_code in
            0)
                echo -e "${GREEN}Overall Status: HEALTHY${NC}"
                ;;
            1)
                echo -e "${YELLOW}Overall Status: WARNING${NC}"
                ;;
            2)
                echo -e "${RED}Overall Status: CRITICAL${NC}"
                ;;
        esac
    
        exit $exit_code
    }
    
    main "$@"
    Bash

    This enhanced troubleshooting section provides comprehensive guidance for diagnosing and resolving common Fluentd issues, with practical examples and actionable solutions.

    11.6 Alerting Setup

    Prometheus Alerting Rules

    # fluentd_alerts.yml
    groups:
    - name: fluentd
      rules:
      - alert: FluentdDown
        expr: up{job="fluentd"} == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Fluentd instance is down"
    
      - alert: FluentdHighBufferUsage
        expr: fluentd_status_buffer_queue_length > 100
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Fluentd buffer queue is high"
    
      - alert: FluentdParseErrors
        expr: rate(fluentd_input_status_num_records_total[5m]) == 0
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Fluentd is not processing any records"
    YAML

    Custom Health Check

    <source>
      @type exec
      command /usr/local/bin/fluentd_health_check.sh
      format json
      tag fluentd.health
      run_interval 60s
    </source>
    
    <match fluentd.health>
      @type copy
    
      # Send to monitoring system
      <store>
        @type elasticsearch
        index_name fluentd-health
      </store>
    
      # Alert on failures
      <store>
        @type grep
        <regexp>
          key status
          pattern ERROR
        </regexp>
    
        @type slack
        webhook_url YOUR_WEBHOOK_URL
        channel ops-alerts
      </store>
    </match>
    ANSI

    11.7 Performance Monitoring Dashboard

    Grafana Dashboard Example

    {
      "dashboard": {
        "title": "Fluentd Monitoring",
        "panels": [
          {
            "title": "Events per Second",
            "type": "graph",
            "targets": [
              {
                "expr": "rate(fluentd_input_status_num_records_total[1m])",
                "legendFormat": "{{tag}}"
              }
            ]
          },
          {
            "title": "Buffer Queue Length",
            "type": "graph", 
            "targets": [
              {
                "expr": "fluentd_status_buffer_queue_length",
                "legendFormat": "{{tag}}"
              }
            ]
          },
          {
            "title": "Memory Usage",
            "type": "graph",
            "targets": [
              {
                "expr": "process_resident_memory_bytes{job=\"fluentd\"}",
                "legendFormat": "Memory Usage"
              }
            ]
          }
        ]
      }
    }
    ANSI

    12. Advanced Patterns

    12.1 Multi-tenant Logging

    Handle logs from multiple tenants with isolation and routing.

    graph TD
        A[Tenant A Logs] --> B[Fluentd Router]
        C[Tenant B Logs] --> B
        D[Tenant C Logs] --> B
    
        B --> E{Tenant Routing}
        E --> F[Tenant A ES Index]
        E --> G[Tenant B ES Index] 
        E --> H[Tenant C ES Index]
    
        B --> I[Shared Analytics]
        B --> J[Audit Logs]
    
        style B fill:#f9f,stroke:#333,stroke-width:2px
    # Multi-tenant configuration
    <source>
      @type forward
      port 24224
      bind 0.0.0.0
    </source>
    
    # Extract tenant from tag or record
    <filter **>
      @type record_transformer
      enable_ruby true
    
      <record>
        tenant_id ${
          if tag.start_with?('tenant.')
            tag.split('.')[1]
          elsif record.has_key?('tenant_id')
            record['tenant_id']
          else
            'default'
          end
        }
      </record>
    </filter>
    
    # Route to tenant-specific outputs
    <match **>
      @type rewrite_tag_filter
    
      <rule>
        key tenant_id
        pattern ^(.+)$
        tag routed.${tenant_id}.${tag}
      </rule>
    </match>
    
    # Tenant A
    <match routed.tenant-a.**>
      @type elasticsearch
      host es-tenant-a.example.com
      index_name tenant-a-logs
    
      <buffer>
        @type file
        path /var/log/fluent/buffer/tenant-a
      </buffer>
    </match>
    
    # Tenant B  
    <match routed.tenant-b.**>
      @type elasticsearch
      host es-tenant-b.example.com
      index_name tenant-b-logs
    
      <buffer>
        @type file
        path /var/log/fluent/buffer/tenant-b
      </buffer>
    </match>
    
    # Default tenant
    <match routed.default.**>
      @type file
      path /var/log/fluent/default-tenant.log
    </match>
    ANSI

    12.2 Event Enrichment Pipeline

    Comprehensive data enrichment example.

    # Raw log input
    <source>
      @type tail
      path /var/log/nginx/access.log
      tag nginx.access.raw
    
      <parse>
        @type nginx
      </parse>
    </source>
    
    # Step 1: Basic enrichment
    <filter nginx.access.raw>
      @type record_transformer
      <record>
        hostname ${hostname}
        environment production
        service_name nginx
        log_type access
        ingestion_time ${Time.now.utc.iso8601}
      </record>
    </filter>
    
    # Step 2: GeoIP enrichment
    <filter nginx.access.raw>
      @type geoip
      geoip_lookup_keys remote
      <record>
        country_code ${city.country.iso_code}
        country_name ${city.country.names.en}
        city_name ${city.city.names.en}
        latitude ${location.latitude}
        longitude ${location.longitude}
        timezone ${location.time_zone}
      </record>
      skip_adding_null_record false
    </filter>
    
    # Step 3: User agent parsing
    <filter nginx.access.raw>
      @type parser
      key_name agent
      reserve_data true
      inject_key_prefix ua_
    
      <parse>
        @type user_agent
      </parse>
    </filter>
    
    # Step 4: Request classification
    <filter nginx.access.raw>
      @type record_transformer
      enable_ruby true
    
      <record>
        request_type ${
          case record["path"]
          when /^\/api\//
            "api"
          when /^\/static\//
            "static"
          when /^\/admin\//
            "admin"
          else
            "web"
          end
        }
    
        is_bot ${record["ua_name"] =~ /(bot|crawler|spider)/i ? true : false}
    
        response_class ${
          status = record["status"].to_i
          case status
          when 200..299
            "success"
          when 300..399
            "redirect"
          when 400..499
            "client_error"
          when 500..599
            "server_error"
          else
            "unknown"
          end
        }
      </record>
    </filter>
    
    # Step 5: Security analysis
    <filter nginx.access.raw>
      @type record_transformer
      enable_ruby true
    
      <record>
        security_flags ${
          flags = []
    
          # Check for suspicious patterns
          flags << "sql_injection" if record["path"] =~ /('|(union\s+select)|(drop\s+table))/i
          flags << "xss_attempt" if record["path"] =~ /(<script|javascript:|onload=)/i
          flags << "path_traversal" if record["path"] =~ /(\.\.\/|\.\.\\)/
          flags << "large_request" if record["request_length"].to_i > 100000
    
          flags.join(",")
        }
    
        risk_score ${
          score = 0
          score += 10 if record["status"].to_i >= 400
          score += 20 if record["response_time"].to_f > 5.0
          score += 30 if record["security_flags"].length > 0
          score += 5 if record["is_bot"]
          score
        }
      </record>
    </filter>
    
    # Route enriched data
    <match nginx.access.raw>
      @type rewrite_tag_filter
    
      <rule>
        key risk_score
        pattern ^([3-9]\d|\d{3,})$
        tag security.high_risk.nginx.access
      </rule>
    
      <rule>
        key request_type
        pattern api
        tag analytics.api.nginx.access
      </rule>
    
      <rule>
        key request_type
        pattern .*
        tag analytics.web.nginx.access
      </rule>
    </match>
    ANSI

    12.3 Dead Letter Queue Pattern

    Handle failed events with retry and dead letter queues.

    # Main processing pipeline
    <match app.**>
      @type copy
    
      # Primary output
      <store>
        @type elasticsearch
        host elasticsearch.example.com
        index_name app-logs
    
        <buffer>
          @type file
          path /var/log/fluent/buffer/elasticsearch
          retry_type exponential_backoff
          retry_max_times 3
          retry_timeout 1h
        </buffer>
    
        # Send failures to DLQ
        <secondary>
          @type relabel
          @label @DLQ
        </secondary>
      </store>
    </match>
    
    # Dead Letter Queue processing
    <label @DLQ>
      <filter **>
        @type record_transformer
        <record>
          dlq_timestamp ${Time.now.utc.iso8601}
          dlq_reason elasticsearch_failure
          original_tag ${tag}
        </record>
      </filter>
    
      # Store in DLQ
      <match **>
        @type file
        path /var/log/fluent/dlq/failed_events.%Y%m%d.log
    
        <format>
          @type json
        </format>
    
        <buffer time>
          @type file
          path /var/log/fluent/buffer/dlq
          timekey 3600
          timekey_wait 300
        </buffer>
      </match>
    </label>
    
    # DLQ reprocessing job (separate process)
    <source>
      @type tail
      path /var/log/fluent/dlq/failed_events.*.log
      tag dlq.reprocess
    
      <parse>
        @type json
      </parse>
    </source>
    
    <filter dlq.reprocess>
      @type record_transformer
      <record>
        reprocess_attempt ${(record["reprocess_attempt"] || 0).to_i + 1}
        reprocess_timestamp ${Time.now.utc.iso8601}
      </record>
    </filter>
    
    # Retry with exponential backoff
    <match dlq.reprocess>
      @type elasticsearch
      host elasticsearch.example.com
      index_name app-logs-dlq
    
      <buffer>
        @type file
        path /var/log/fluent/buffer/dlq-retry
        retry_type exponential_backoff
        retry_max_times 5
        retry_timeout 24h
      </buffer>
    </match>
    ANSI

    12.4 Circuit Breaker Pattern

    Implement circuit breaker for external service protection.

    # Circuit breaker filter
    <filter app.**>
      @type record_transformer
      enable_ruby true
    
      <record>
        circuit_breaker_state ${
          # Simple circuit breaker logic
          error_file = "/tmp/elasticsearch_errors"
          current_time = Time.now.to_i
    
          # Read error count and timestamp
          if File.exist?(error_file)
            lines = File.readlines(error_file)
            last_error_time = lines.last.to_i if lines.any?
            error_count = lines.size
          else
            last_error_time = 0
            error_count = 0
          end
    
          # Circuit breaker states
          if error_count >= 5 && (current_time - last_error_time) < 300
            "open"  # Circuit open, bypass for 5 minutes
          elsif error_count >= 5 && (current_time - last_error_time) >= 300
            "half_open"  # Try again
          else
            "closed"  # Normal operation
          end
        }
      </record>
    </filter>
    
    # Route based on circuit breaker state
    <match app.**>
      @type rewrite_tag_filter
    
      <rule>
        key circuit_breaker_state
        pattern open
        tag circuit_breaker.bypass.${tag}
      </rule>
    
      <rule>
        key circuit_breaker_state
        pattern .*
        tag circuit_breaker.normal.${tag}
      </rule>
    </match>
    
    # Normal processing (circuit closed/half-open)
    <match circuit_breaker.normal.**>
      @type elasticsearch
      host elasticsearch.example.com
    
      <buffer>
        retry_max_times 2
      </buffer>
    
      # On failure, record error and use fallback
      <secondary>
        @type exec
        command echo $(date +%s) >> /tmp/elasticsearch_errors
        <format>
          @type none
        </format>
      </secondary>
    </match>
    
    # Bypass processing (circuit open)
    <match circuit_breaker.bypass.**>
      @type file
      path /var/log/fluent/circuit_breaker_bypass.log
    
      <format>
        @type json
      </format>
    </match>
    ANSI

    12.5 Data Lake Pattern

    Organize data for analytics and data science.

    # Multi-format data lake storage
    <match app.**>
      @type copy
    
      # Real-time analytics (Elasticsearch)
      <store>
        @type elasticsearch
        host elasticsearch.example.com
        index_name app-logs
      </store>
    
      # Data lake storage (S3 with partitioning)
      <store>
        @type s3
        s3_bucket data-lake-raw
    
        # Partitioned by date and service
        path raw/service=${tag_parts[0]}/year=%Y/month=%m/day=%d/hour=%H/
        s3_object_key_format %{path}%{time_slice}_%{index}.%{file_extension}
    
        # Multiple formats for different use cases
        <format>
          @type json
        </format>
    
        <buffer time,tag>
          @type file
          path /var/log/fluent/buffer/datalake
          timekey 3600
          timekey_wait 300
          chunk_limit_size 128MB
        </buffer>
      </store>
    
      # Processed data for ML (Parquet format)
      <store>
        @type s3
        s3_bucket data-lake-processed
        path processed/service=${tag_parts[0]}/year=%Y/month=%m/day=%d/
    
        # Use Parquet for efficient analytics
        <format>
          @type parquet
          schema [
            {name: "timestamp", type: "string"},
            {name: "level", type: "string"},
            {name: "message", type: "string"},
            {name: "service", type: "string"},
            {name: "hostname", type: "string"}
          ]
        </format>
    
        <buffer time,tag>
          @type file
          path /var/log/fluent/buffer/parquet
          timekey 86400  # Daily files
          timekey_wait 3600
          chunk_limit_size 256MB
        </buffer>
      </store>
    </match>
    ANSI

    13. Custom Plugin Development

    13.1 Plugin Architecture

    graph TD
        A[Fluentd Plugin System] --> B[Input Plugins]
        A --> C[Output Plugins]
        A --> D[Filter Plugins]
        A --> E[Parser Plugins]
        A --> F[Formatter Plugins]
        A --> G[Buffer Plugins]
    
        B --> H[Base Input Class]
        C --> I[Base Output Class]
        D --> J[Base Filter Class]
        E --> K[Base Parser Class]
        F --> L[Base Formatter Class]
        G --> M[Base Buffer Class]
    
        style A fill:#e3f2fd
        style H fill:#e8f5e8
        style I fill:#fff3e0
        style J fill:#f3e5f5
        style K fill:#ffebee
        style L fill:#e0f2f1
        style M fill:#fce4ec

    13.2 Input Plugin Example

    Create a custom input plugin that reads from a Redis list.

    # filepath: lib/fluent/plugin/in_redis_list.rb
    require 'fluent/plugin/input'
    require 'redis'
    
    module Fluent
      module Plugin
        class RedisListInput < Fluent::Plugin::Input
          Fluent::Plugin.register_input('redis_list', self)
    
          desc 'Redis host'
          config_param :host, :string, default: 'localhost'
    
          desc 'Redis port'
          config_param :port, :integer, default: 6379
    
          desc 'Redis password'
          config_param :password, :string, default: nil, secret: true
    
          desc 'Redis database number'
          config_param :db, :integer, default: 0
    
          desc 'Redis list key to read from'
          config_param :key, :string
    
          desc 'Tag to assign to events'
          config_param :tag, :string
    
          desc 'Polling interval in seconds'
          config_param :interval, :time, default: 1
    
          desc 'Batch size for each poll'
          config_param :batch_size, :integer, default: 100
    
          def configure(conf)
            super
    
            @redis_config = {
              host: @host,
              port: @port,
              password: @password,
              db: @db,
              timeout: 5
            }
          end
    
          def start
            super
            @redis = Redis.new(@redis_config)
            @thread = Thread.new(&method(:run))
          end
    
          def shutdown
            @thread.terminate if @thread
            @redis.quit if @redis
            super
          end
    
          private
    
          def run
            while true
              begin
                messages = @redis.lrange(@key, 0, @batch_size - 1)
    
                if messages.any?
                  # Remove processed messages
                  @redis.ltrim(@key, messages.size, -1)
    
                  # Process each message
                  messages.each do |message|
                    begin
                      record = JSON.parse(message)
                      time = record.delete('timestamp') || Engine.now
    
                      router.emit(@tag, time, record)
                    rescue JSON::ParserError => e
                      log.warn "Failed to parse JSON: #{message}", error: e
    
                      # Emit as raw message
                      router.emit(@tag, Engine.now, {'raw_message' => message})
                    end
                  end
                end
    
                sleep @interval
    
              rescue => e
                log.error "Redis list input error", error: e
                sleep @interval
              end
            end
          end
        end
      end
    end
    Ruby

    Usage configuration:

    <source>
      @type redis_list
      host redis.example.com
      port 6379
      password secret
      key application_logs
      tag app.redis_logs
      interval 2s
      batch_size 50
    </source>
    ANSI

    13.3 Filter Plugin Example

    Create a filter that adds machine learning predictions.

    # filepath: lib/fluent/plugin/filter_ml_classifier.rb
    require 'fluent/plugin/filter'
    require 'net/http'
    require 'json'
    
    module Fluent
      module Plugin
        class MLClassifierFilter < Fluent::Plugin::Filter
          Fluent::Plugin.register_filter('ml_classifier', self)
    
          desc 'ML service endpoint URL'
          config_param :endpoint_url, :string
    
          desc 'API key for ML service'
          config_param :api_key, :string, secret: true
    
          desc 'Fields to send for classification'
          config_param :input_fields, :array, default: ['message']
    
          desc 'Field name for prediction result'
          config_param :prediction_field, :string, default: 'ml_prediction'
    
          desc 'Field name for confidence score'
          config_param :confidence_field, :string, default: 'ml_confidence'
    
          desc 'Timeout for ML service calls'
          config_param :timeout, :time, default: 5
    
          desc 'Cache TTL for predictions'
          config_param :cache_ttl, :time, default: 300
    
          def configure(conf)
            super
            @cache = {}
            @uri = URI(@endpoint_url)
          end
    
          def filter(tag, time, record)
            begin
              # Create cache key from input fields
              cache_key = @input_fields.map { |field| record[field] }.join('|')
    
              # Check cache first
              if cached_result = get_from_cache(cache_key)
                record[@prediction_field] = cached_result[:prediction]
                record[@confidence_field] = cached_result[:confidence]
                return record
              end
    
              # Prepare ML service request
              input_data = {}
              @input_fields.each do |field|
                input_data[field] = record[field] if record.key?(field)
              end
    
              # Call ML service
              prediction_result = call_ml_service(input_data)
    
              if prediction_result
                record[@prediction_field] = prediction_result[:prediction]
                record[@confidence_field] = prediction_result[:confidence]
    
                # Cache the result
                set_cache(cache_key, prediction_result)
              end
    
            rescue => e
              log.warn "ML classification failed", error: e, record: record
              record[@prediction_field] = 'error'
              record[@confidence_field] = 0.0
            end
    
            record
          end
    
          private
    
          def call_ml_service(input_data)
            http = Net::HTTP.new(@uri.host, @uri.port)
            http.use_ssl = (@uri.scheme == 'https')
            http.read_timeout = @timeout
    
            request = Net::HTTP::Post.new(@uri.path)
            request['Content-Type'] = 'application/json'
            request['Authorization'] = "Bearer #{@api_key}"
            request.body = input_data.to_json
    
            response = http.request(request)
    
            if response.code == '200'
              result = JSON.parse(response.body)
              {
                prediction: result['prediction'],
                confidence: result['confidence']
              }
            else
              log.warn "ML service returned error", code: response.code, body: response.body
              nil
            end
          end
    
          def get_from_cache(key)
            if @cache.key?(key)
              entry = @cache[key]
              if Time.now - entry[:timestamp] < @cache_ttl
                return entry[:result]
              else
                @cache.delete(key)
              end
            end
            nil
          end
    
          def set_cache(key, result)
            @cache[key] = {
              result: result,
              timestamp: Time.now
            }
    
            # Simple cache cleanup
            if @cache.size > 1000
              @cache = @cache.select { |k, v| Time.now - v[:timestamp] < @cache_ttl }
            end
          end
        end
      end
    end
    Ruby

    13.4 Output Plugin Example

    Create an output plugin for a custom webhook service.

    # filepath: lib/fluent/plugin/out_webhook.rb
    require 'fluent/plugin/output'
    require 'net/http'
    require 'json'
    require 'openssl'
    
    module Fluent
      module Plugin
        class WebhookOutput < Fluent::Plugin::Output
          Fluent::Plugin.register_output('webhook', self)
    
          desc 'Webhook endpoint URL'
          config_param :endpoint_url, :string
    
          desc 'HTTP method'
          config_param :http_method, :enum, list: [:post, :put, :patch], default: :post
    
          desc 'Request headers'
          config_param :headers, :hash, default: {}
    
          desc 'Authentication type'
          config_param :auth_type, :enum, list: [:none, :basic, :bearer, :signature], default: :none
    
          desc 'Username for basic auth'
          config_param :username, :string, default: nil, secret: true
    
          desc 'Password for basic auth'
          config_param :password, :string, default: nil, secret: true
    
          desc 'Bearer token'
          config_param :bearer_token, :string, default: nil, secret: true
    
          desc 'Signature secret for HMAC'
          config_param :signature_secret, :string, default: nil, secret: true
    
          desc 'Signature header name'
          config_param :signature_header, :string, default: 'X-Signature'
    
          desc 'Request timeout'
          config_param :timeout, :time, default: 10
    
          desc 'SSL verification'
          config_param :ssl_verify, :bool, default: true
    
          def configure(conf)
            super
            @uri = URI(@endpoint_url)
    
            # Validate configuration
            if @auth_type == :basic && (@username.nil? || @password.nil?)
              raise Fluent::ConfigError, 'username and password required for basic auth'
            end
    
            if @auth_type == :bearer && @bearer_token.nil?
              raise Fluent::ConfigError, 'bearer_token required for bearer auth'
            end
    
            if @auth_type == :signature && @signature_secret.nil?
              raise Fluent::ConfigError, 'signature_secret required for signature auth'
            end
          end
    
          def multi_workers_ready?
            true
          end
    
          def write(chunk)
            events = []
            chunk.msgpack_each do |time, record|
              events << {
                timestamp: Time.at(time).utc.iso8601,
                record: record
              }
            end
    
            payload = {
              events: events,
              metadata: {
                count: events.size,
                chunk_id: chunk.unique_id
              }
            }
    
            send_webhook(payload)
          end
    
          private
    
          def send_webhook(payload)
            http = Net::HTTP.new(@uri.host, @uri.port)
            http.use_ssl = (@uri.scheme == 'https')
            http.verify_mode = @ssl_verify ? OpenSSL::SSL::VERIFY_PEER : OpenSSL::SSL::VERIFY_NONE
            http.read_timeout = @timeout
            http.open_timeout = @timeout
    
            # Create request
            request = case @http_method
                      when :post
                        Net::HTTP::Post.new(@uri.path)
                      when :put
                        Net::HTTP::Put.new(@uri.path)
                      when :patch
                        Net::HTTP::Patch.new(@uri.path)
                      end
    
            # Set headers
            request['Content-Type'] = 'application/json'
            @headers.each { |key, value| request[key] = value }
    
            # Set authentication
            case @auth_type
            when :basic
              request.basic_auth(@username, @password)
            when :bearer
              request['Authorization'] = "Bearer #{@bearer_token}"
            when :signature
              body = payload.to_json
              signature = OpenSSL::HMAC.hexdigest(OpenSSL::Digest.new('sha256'), @signature_secret, body)
              request[@signature_header] = "sha256=#{signature}"
              request.body = body
            end
    
            # Set body if not already set
            request.body ||= payload.to_json
    
            # Send request
            response = http.request(request)
    
            unless response.code.start_with?('2')
              raise "Webhook request failed: #{response.code} #{response.body}"
            end
    
            log.debug "Webhook sent successfully", code: response.code, events: payload[:metadata][:count]
    
          rescue => e
            log.error "Failed to send webhook", error: e
            raise
          end
        end
      end
    end
    Ruby

    13.5 Plugin Testing

    Create comprehensive tests for your plugins.

    # filepath: test/plugin/test_in_redis_list.rb
    require 'helper'
    require 'fluent/test/driver/input'
    require 'fluent/plugin/in_redis_list'
    
    class RedisListInputTest < Test::Unit::TestCase
      def setup
        Fluent::Test.setup
      end
    
      CONFIG = %[
        host localhost
        port 6379
        key test_list
        tag test.redis
        interval 1s
        batch_size 10
      ]
    
      def create_driver(conf = CONFIG)
        Fluent::Test::Driver::Input.new(Fluent::Plugin::RedisListInput).configure(conf)
      end
    
      test 'configure' do
        d = create_driver
        assert_equal 'localhost', d.instance.host
        assert_equal 6379, d.instance.port
        assert_equal 'test_list', d.instance.key
        assert_equal 'test.redis', d.instance.tag
      end
    
      test 'emit events from redis list' do
        # Mock Redis
        redis_mock = mock('redis')
        redis_mock.expects(:lrange).returns(['{"message": "test1"}', '{"message": "test2"}'])
        redis_mock.expects(:ltrim).with('test_list', 2, -1)
        redis_mock.expects(:quit)
    
        Redis.stubs(:new).returns(redis_mock)
    
        d = create_driver
    
        d.run(expect_emits: 2, timeout: 5) do
          # Events will be emitted by the plugin
        end
    
        events = d.events
        assert_equal 2, events.length
        assert_equal 'test.redis', events[0][0]
        assert_equal({'message' => 'test1'}, events[0][2])
        assert_equal({'message' => 'test2'}, events[1][2])
      end
    end
    Ruby

    13.6 Plugin Distribution

    Gem Structure

    fluent-plugin-myservice/
    ├── fluent-plugin-myservice.gemspec
    ├── lib/
    │   └── fluent/
    │       └── plugin/
    │           ├── in_myservice.rb
    │           ├── out_myservice.rb
    │           └── filter_myservice.rb
    ├── test/
    │   └── plugin/
    │       ├── test_in_myservice.rb
    │       ├── test_out_myservice.rb
    │       └── test_filter_myservice.rb
    ├── README.md
    └── LICENSE
    ANSI

    Gemspec Example

    # filepath: fluent-plugin-myservice.gemspec
    Gem::Specification.new do |spec|
      spec.name          = "fluent-plugin-myservice"
      spec.version       = "1.0.0"
      spec.authors       = ["Your Name"]
      spec.email         = ["your.email@example.com"]
    
      spec.summary       = "Fluentd plugin for MyService"
      spec.description   = "Input, output, and filter plugins for MyService integration"
      spec.homepage      = "https://github.com/yourname/fluent-plugin-myservice"
      spec.license       = "MIT"
    
      spec.files         = Dir['lib/**/*', 'README.md', 'LICENSE']
      spec.require_paths = ["lib"]
    
      spec.add_dependency "fluentd", ">= 1.0", "< 2"
      spec.add_dependency "httparty", "~> 0.18"
    
      spec.add_development_dependency "bundler", "~> 2.0"
      spec.add_development_dependency "rake", "~> 13.0"
      spec.add_development_dependency "test-unit", "~> 3.0"
      spec.add_development_dependency "mocha", "~> 1.0"
    end
    ANSI

    14. Production Deployment

    14.1 Deployment Architecture

    graph TB
        A[Application Servers] --> B[Fluentd Forwarders]
        C[System Logs] --> B
        D[Container Logs] --> E[Fluentd DaemonSet]
    
        B --> F[Load Balancer]
        E --> F
    
        F --> G[Fluentd Aggregators]
        G --> H[Elasticsearch Cluster]
        G --> I[S3 Storage]
        G --> J[Monitoring Systems]
    
        K[Fluentd Monitoring] --> G
        L[Config Management] --> G
    
        style G fill:#f9f,stroke:#333,stroke-width:2px
        style F fill:#e3f2fd

    14.2 High Availability Setup

    Load Balancer Configuration

    # nginx upstream configuration
    upstream fluentd_aggregators {
        least_conn;
        server fluentd-agg-1:24224 weight=1 max_fails=3 fail_timeout=30s;
        server fluentd-agg-2:24224 weight=1 max_fails=3 fail_timeout=30s;
        server fluentd-agg-3:24224 weight=1 max_fails=3 fail_timeout=30s;
        keepalive 32;
    }
    
    server {
        listen 24224;
        proxy_pass fluentd_aggregators;
        proxy_timeout 60s;
        proxy_connect_timeout 5s;
    }
    ANSI

    Aggregator Configuration

    # fluentd-aggregator.conf
    <system>
      workers 4
      root_dir /var/log/fluent
      log_level info
    </system>
    
    # Input from forwarders
    <source>
      @type forward
      port 24224
      bind 0.0.0.0
    
      # Security
      <security>
        self_hostname #{ENV['HOSTNAME']}
        shared_key "#{ENV['FLUENT_SHARED_KEY']}"
      </security>
    
      # Performance
      chunk_size_limit 8MB
      chunk_size_warn_limit 8MB
    </source>
    
    # Health check endpoint
    <source>
      @type http
      port 8888
      bind 0.0.0.0
    </source>
    
    # Metrics
    <source>
      @type monitor_agent
      port 24220
      bind 0.0.0.0
    </source>
    
    <source>
      @type prometheus
      port 24231
      bind 0.0.0.0
    </source>
    
    # Processing pipeline
    @include conf.d/*.conf
    
    # Fallback output
    <match **>
      @type file
      path /var/log/fluent/fallback/events.%Y%m%d.log
    
      <format>
        @type json
      </format>
    
      <buffer time>
        @type file
        path /var/log/fluent/buffer/fallback
        timekey 3600
        timekey_wait 300
      </buffer>
    </match>
    ANSI

    Forwarder Configuration

    # fluentd-forwarder.conf
    <system>
      log_level info
      suppress_repeated_stacktrace true
    </system>
    
    # Local log collection
    <source>
      @type tail
      path /var/log/app/*.log
      pos_file /var/log/fluent/app.log.pos
      tag app.logs
    
      <parse>
        @type json
        time_key timestamp
        time_format %Y-%m-%d %H:%M:%S %z
      </parse>
    </source>
    
    <source>
      @type tail
      path /var/log/nginx/*.log
      pos_file /var/log/fluent/nginx.log.pos
      tag nginx.logs
    
      <parse>
        @type nginx
      </parse>
    </source>
    
    # Forward to aggregators
    <match **>
      @type forward
    
      # Multiple aggregators for HA
      <server>
        name agg1
        host fluentd-agg-1.example.com
        port 24224
        weight 60
      </server>
    
      <server>
        name agg2
        host fluentd-agg-2.example.com
        port 24224
        weight 60
        standby
      </server>
    
      <server>
        name agg3
        host fluentd-agg-3.example.com
        port 24224
        weight 60
        standby
      </server>
    
      # Security
      <security>
        self_hostname #{ENV['HOSTNAME']}
        shared_key "#{ENV['FLUENT_SHARED_KEY']}"
      </security>
    
      # Buffering
      <buffer>
        @type file
        path /var/log/fluent/buffer/forward
        chunk_limit_size 16MB
        queue_limit_length 128
        flush_mode interval
        flush_interval 10s
        retry_type exponential_backoff
        retry_forever true
      </buffer>
    
      # Fallback to local storage
      <secondary>
        @type file
        path /var/log/fluent/failed/forward.%Y%m%d.log
    
        <format>
          @type json
        </format>
      </secondary>
    </match>
    ANSI

    14.3 Kubernetes Deployment

    DaemonSet for Log Collection

    # fluentd-daemonset.yaml
    apiVersion: apps/v1
    kind: DaemonSet
    metadata:
      name: fluentd
      namespace: logging
      labels:
        k8s-app: fluentd-logging
    spec:
      selector:
        matchLabels:
          name: fluentd
      template:
        metadata:
          labels:
            name: fluentd
        spec:
          serviceAccountName: fluentd
          tolerations:
          - key: node-role.kubernetes.io/master
            effect: NoSchedule
          containers:
          - name: fluentd
            image: fluent/fluentd-kubernetes-daemonset:v1-debian-elasticsearch
            env:
            - name: FLUENT_ELASTICSEARCH_HOST
              value: "elasticsearch.logging.svc.cluster.local"
            - name: FLUENT_ELASTICSEARCH_PORT
              value: "9200"
            - name: FLUENT_ELASTICSEARCH_SCHEME
              value: "http"
            - name: FLUENTD_SYSTEMD_CONF
              value: disable
            resources:
              limits:
                memory: 200Mi
                cpu: 100m
              requests:
                memory: 200Mi
                cpu: 100m
            volumeMounts:
            - name: varlog
              mountPath: /var/log
            - name: varlibdockercontainers
              mountPath: /var/lib/docker/containers
              readOnly: true
            - name: fluentd-config
              mountPath: /fluentd/etc/conf.d
          terminationGracePeriodSeconds: 30
          volumes:
          - name: varlog
            hostPath:
              path: /var/log
          - name: varlibdockercontainers
            hostPath:
              path: /var/lib/docker/containers
          - name: fluentd-config
            configMap:
              name: fluentd-config
    YAML

    Aggregator Deployment

    # fluentd-aggregator.yaml
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: fluentd-aggregator
      namespace: logging
    spec:
      replicas: 3
      selector:
        matchLabels:
          app: fluentd-aggregator
      template:
        metadata:
          labels:
            app: fluentd-aggregator
        spec:
          containers:
          - name: fluentd
            image: fluent/fluentd:v1.15-1
            ports:
            - containerPort: 24224
              name: forward
            - containerPort: 24220
              name: monitor
            - containerPort: 24231
              name: metrics
            env:
            - name: FLUENTD_CONF
              value: fluent.conf
            - name: FLUENT_ELASTICSEARCH_HOST
              value: elasticsearch.logging.svc.cluster.local
            resources:
              limits:
                memory: 1Gi
                cpu: 500m
              requests:
                memory: 1Gi
                cpu: 500m
            volumeMounts:
            - name: fluentd-config
              mountPath: /fluentd/etc
            - name: buffer-storage
              mountPath: /var/log/fluent
            livenessProbe:
              httpGet:
                path: /api/plugins.json
                port: 24220
              initialDelaySeconds: 30
              periodSeconds: 30
            readinessProbe:
              httpGet:
                path: /api/plugins.json
                port: 24220
              initialDelaySeconds: 10
              periodSeconds: 10
          volumes:
          - name: fluentd-config
            configMap:
              name: fluentd-aggregator-config
          - name: buffer-storage
            emptyDir:
              sizeLimit: 10Gi
    ---
    apiVersion: v1
    kind: Service
    metadata:
      name: fluentd-aggregator
      namespace: logging
    spec:
      selector:
        app: fluentd-aggregator
      ports:
      - name: forward
        port: 24224
        targetPort: 24224
      - name: monitor
        port: 24220
        targetPort: 24220
      - name: metrics
        port: 24231
        targetPort: 24231
      type: ClusterIP
    YAML

    14.4 Configuration Management

    Ansible Playbook

    # deploy-fluentd.yml
    ---
    - hosts: fluentd_aggregators
      become: yes
      vars:
        fluentd_version: "1.15.3"
        elasticsearch_hosts: 
          - "es1.example.com:9200"
          - "es2.example.com:9200"
          - "es3.example.com:9200"
    
      tasks:
      - name: Install Fluentd
        package:
          name: fluent-package
          state: present
    
      - name: Create fluentd directories
        file:
          path: "{{ item }}"
          state: directory
          owner: fluent
          group: fluent
          mode: '0755'
        loop:
          - /etc/fluent/conf.d
          - /var/log/fluent/buffer
          - /var/log/fluent/failed
    
      - name: Deploy main configuration
        template:
          src: fluent.conf.j2
          dest: /etc/fluent/fluent.conf
          owner: fluent
          group: fluent
          mode: '0644'
        notify: restart fluentd
    
      - name: Deploy pipeline configurations
        template:
          src: "{{ item }}.j2"
          dest: "/etc/fluent/conf.d/{{ item }}"
          owner: fluent
          group: fluent
          mode: '0644'
        loop:
          - 01-sources.conf
          - 02-filters.conf
          - 03-outputs.conf
        notify: restart fluentd
    
      - name: Configure systemd service
        systemd:
          name: fluentd
          enabled: yes
          state: started
          daemon_reload: yes
    
      handlers:
      - name: restart fluentd
        systemd:
          name: fluentd
          state: restarted
    YAML

    Docker Compose

    # docker-compose.yml
    version: '3.8'
    
    services:
      fluentd-aggregator-1:
        image: fluent/fluentd:v1.15-1
        container_name: fluentd-agg-1
        ports:
          - "24224:24224"
          - "24220:24220"
          - "24231:24231"
        volumes:
          - ./config:/fluentd/etc
          - fluentd-buffer-1:/var/log/fluent
        environment:
          - FLUENTD_CONF=fluent.conf
          - FLUENT_ELASTICSEARCH_HOST=elasticsearch
        depends_on:
          - elasticsearch
        restart: unless-stopped
        networks:
          - logging
        healthcheck:
          test: ["CMD", "curl", "-f", "http://localhost:24220/api/plugins.json"]
          interval: 30s
          timeout: 10s
          retries: 3
    
      fluentd-aggregator-2:
        image: fluent/fluentd:v1.15-1
        container_name: fluentd-agg-2
        ports:
          - "24225:24224"
          - "24221:24220"
          - "24232:24231"
        volumes:
          - ./config:/fluentd/etc
          - fluentd-buffer-2:/var/log/fluent
        environment:
          - FLUENTD_CONF=fluent.conf
          - FLUENT_ELASTICSEARCH_HOST=elasticsearch
        depends_on:
          - elasticsearch
        restart: unless-stopped
        networks:
          - logging
    
      elasticsearch:
        image: docker.elastic.co/elasticsearch/elasticsearch:7.15.0
        container_name: elasticsearch
        environment:
          - discovery.type=single-node
          - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
        ports:
          - "9200:9200"
        volumes:
          - elasticsearch-data:/usr/share/elasticsearch/data
        networks:
          - logging
    
      nginx-lb:
        image: nginx:alpine
        container_name: fluentd-lb
        ports:
          - "24224:24224"
        volumes:
          - ./nginx.conf:/etc/nginx/nginx.conf:ro
        depends_on:
          - fluentd-aggregator-1
          - fluentd-aggregator-2
        networks:
          - logging
    
    volumes:
      fluentd-buffer-1:
      fluentd-buffer-2:
      elasticsearch-data:
    
    networks:
      logging:
        driver: bridge
    YAML

    14.5 Security Hardening

    SSL/TLS Configuration

    # Secure forward input
    <source>
      @type forward
      port 24224
      bind 0.0.0.0
    
      <transport tls>
        cert_path /etc/fluent/certs/server.crt
        private_key_path /etc/fluent/certs/server.key
        ca_cert_path /etc/fluent/certs/ca.crt
        client_cert_auth true
      </transport>
    
      <security>
        self_hostname fluentd-aggregator
        shared_key "#{ENV['FLUENT_SHARED_KEY']}"
    
        <client>
          host fluentd-forwarder-1
          shared_key "#{ENV['FLUENT_SHARED_KEY']}"
        </client>
      </security>
    </source>
    
    # Secure forward output
    <match **>
      @type forward
    
      <server>
        host secure-aggregator.example.com
        port 24224
      </server>
    
      <transport tls>
        cert_path /etc/fluent/certs/client.crt
        private_key_path /etc/fluent/certs/client.key
        ca_cert_path /etc/fluent/certs/ca.crt
      </transport>
    
      <security>
        self_hostname fluentd-forwarder
        shared_key "#{ENV['FLUENT_SHARED_KEY']}"
      </security>
    </match>
    ANSI

    Role-based Access Control

    # Create fluentd user with minimal privileges
    useradd -r -s /bin/false -d /var/lib/fluentd fluentd
    
    # Set file permissions
    chown -R fluentd:fluentd /etc/fluent
    chown -R fluentd:fluentd /var/log/fluent
    chmod 750 /etc/fluent
    chmod 640 /etc/fluent/*.conf
    chmod 600 /etc/fluent/certs/*
    
    # SELinux context (if enabled)
    semanage fcontext -a -t admin_home_exec_t "/usr/sbin/fluentd"
    restorecon -R /usr/sbin/fluentd
    Bash

    15. Best Practices

    15.1 Configuration Best Practices

    1. Organize Configuration Files

    # Main configuration file
    # /etc/fluent/fluent.conf
    <system>
      @include system.conf
    </system>
    
    @include conf.d/*.conf
    
    # Global error handling
    <label @ERROR>
      <match **>
        @type file
        path /var/log/fluent/error/error.%Y%m%d.log
    
        <format>
          @type json
        </format>
      </match>
    </label>
    ANSI
    # /etc/fluent/conf.d/01-inputs.conf
    <source>
      @type forward
      port 24224
      bind 0.0.0.0
    </source>
    
    <source>
      @type tail
      path /var/log/app/*.log
      pos_file /var/log/fluent/app.log.pos
      tag app.logs
    
      <parse>
        @type json
      </parse>
    </source>
    ANSI
    # /etc/fluent/conf.d/02-filters.conf
    <filter app.**>
      @type record_transformer
      <record>
        hostname ${hostname}
        environment production
      </record>
    </filter>
    
    <filter app.**>
      @type grep
      <exclude>
        key path
        pattern ^/health$
      </exclude>
    </filter>
    ANSI
    # /etc/fluent/conf.d/03-outputs.conf
    <match app.**>
      @type elasticsearch
      host elasticsearch.example.com
      index_name app-logs
    
      <buffer>
        @type file
        path /var/log/fluent/buffer/elasticsearch
        flush_mode interval
        flush_interval 10s
      </buffer>
    </match>
    ANSI

    2. Use Environment Variables

    # Environment-specific configuration
    <match app.**>
      @type elasticsearch
      host "#{ENV['ELASTICSEARCH_HOST'] || 'localhost'}"
      port "#{ENV['ELASTICSEARCH_PORT'] || 9200}"
      user "#{ENV['ELASTICSEARCH_USER']}"
      password "#{ENV['ELASTICSEARCH_PASSWORD']}"
    
      index_name "#{ENV['APP_NAME'] || 'app'}-logs"
    
      <buffer>
        @type file
        path /var/log/fluent/buffer/elasticsearch
        flush_mode interval
        flush_interval "#{ENV['FLUSH_INTERVAL'] || '10s'}"
      </buffer>
    </match>
    ANSI

    3. Implement Proper Error Handling

    # Comprehensive error handling strategy
    <system>
      # Global error routing
      emit_error_log_interval 30s
      suppress_repeated_stacktrace true
    </system>
    
    # Main processing with error recovery
    <match app.**>
      @type copy
    
      # Primary output
      <store>
        @type elasticsearch
        host elasticsearch.example.com
        index_name app-logs
    
        <buffer>
          retry_max_times 3
          retry_type exponential_backoff
        </buffer>
    
        # Route failures to error handling
        <secondary>
          @type relabel
          @label @ERROR_RECOVERY
        </secondary>
      </store>
    </match>
    
    # Error recovery pipeline
    <label @ERROR_RECOVERY>
      <filter **>
        @type record_transformer
        <record>
          error_timestamp ${Time.now.utc.iso8601}
          error_reason primary_output_failed
        </record>
      </filter>
    
      # Try alternative output
      <match **>
        @type s3
        s3_bucket backup-logs
        path error-recovery/
    
        <buffer>
          flush_mode immediate
        </buffer>
    
        # Final fallback to local storage
        <secondary>
          @type file
          path /var/log/fluent/ultimate_fallback.log
        </secondary>
      </match>
    </label>
    ANSI

    15.2 Performance Best Practices

    1. Optimize Input Performance

    # High-performance tail configuration
    <source>
      @type tail
      path /var/log/app/*.log
      pos_file /var/log/fluent/app.log.pos
      tag app.logs
    
      # Performance optimizations
      read_from_head false
      refresh_interval 60
      rotate_wait 5
      enable_watch_timer false
      enable_stat_watcher true
      open_on_every_update false
      read_lines_limit 1000
    
      # Skip unnecessary parsing at input
      <parse>
        @type none
      </parse>
    </source>
    
    # Parse only when needed
    <filter app.logs>
      @type parser
      key_name message
      reserve_data true
    
      <parse>
        @type json
        json_parser oj  # Fastest JSON parser
      </parse>
    </filter>
    ANSI

    2. Buffer Optimization Patterns

    # Pattern 1: High-throughput, low-latency
    <match realtime.**>
      @type elasticsearch
    
      <buffer>
        @type memory
        chunk_limit_size 8MB
        flush_mode immediate
        flush_thread_count 4
        queue_limit_length 32
      </buffer>
    </match>
    
    # Pattern 2: High-volume, batch processing
    <match batch.**>
      @type s3
    
      <buffer time>
        @type file
        path /var/log/fluent/buffer/s3
        timekey 3600
        timekey_wait 300
        chunk_limit_size 256MB
        flush_thread_count 2
      </buffer>
    </match>
    
    # Pattern 3: Critical data, maximum reliability
    <match critical.**>
      @type elasticsearch
    
      <buffer>
        @type file
        path /var/log/fluent/buffer/critical
        chunk_limit_size 32MB
        queue_limit_length 128
        flush_mode interval
        flush_interval 30s
        flush_at_shutdown true
        retry_forever true
        overflow_action block
      </buffer>
    </match>
    ANSI

    15.3 Security Best Practices

    1. Data Sanitization

    # Remove sensitive data
    <filter app.**>
      @type record_transformer
      enable_ruby true
    
      <record>
        # Mask credit card numbers
        message ${record["message"].gsub(/\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b/, "****-****-****-****")}
    
        # Remove email addresses
        sanitized_message ${record["message"].gsub(/\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b/, "[EMAIL_REDACTED]")}
      </record>
    
      # Remove sensitive fields entirely
      remove_keys password,secret,token,api_key,authorization
    </filter>
    
    # PII detection and handling
    <filter **>
      @type grep
      <regexp>
        key message
        pattern /\b\d{3}-\d{2}-\d{4}\b/  # SSN pattern
      </regexp>
      @label @PII_DETECTED
    </filter>
    
    <label @PII_DETECTED>
      <match **>
        @type file
        path /var/log/fluent/pii_detected.log
    
        <format>
          @type json
        </format>
      </match>
    </label>
    ANSI

    2. Access Control and Encryption

    # Encrypted communication
    <source>
      @type forward
      port 24224
    
      <transport tls>
        cert_path /etc/fluent/certs/server.crt
        private_key_path /etc/fluent/certs/server.key
        ca_cert_path /etc/fluent/certs/ca.crt
        client_cert_auth true
        ciphers TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256
        insecure false
      </transport>
    
      <security>
        self_hostname fluentd-server
        shared_key "#{ENV['FLUENT_SHARED_KEY']}"
        allow_anonymous_source false
    
        <client>
          host allowed-client.example.com
          shared_key "#{ENV['FLUENT_SHARED_KEY']}"
        </client>
      </security>
    </source>
    ANSI

    15.4 Monitoring and Alerting Best Practices

    1. Comprehensive Metrics Collection

    # System metrics
    <system>
      enable_input_metrics true
      enable_size_metrics true
    
      <metrics>
        @type prometheus
        port 24231
        bind 0.0.0.0
      </metrics>
    </system>
    
    # Custom application metrics
    <filter app.**>
      @type prometheus
    
      <metric>
        name fluentd_app_log_total
        type counter
        desc Total number of application logs
        <labels>
          service_name ${record["service"]}
          log_level ${record["level"]}
          hostname ${hostname}
        </labels>
      </metric>
    
      <metric>
        name fluentd_app_response_time
        type histogram
        desc Application response time distribution
        key response_time
        buckets 0.1,0.5,1.0,5.0,10.0
        <labels>
          service_name ${record["service"]}
          endpoint ${record["endpoint"]}
        </labels>
      </metric>
    </filter>
    ANSI

    2. Health Check Implementation

    #!/bin/bash
    # fluentd_health_check.sh
    
    set -e
    
    HEALTH_CHECK_URL="http://localhost:24220/api/plugins.json"
    BUFFER_THRESHOLD=1000
    ERROR_LOG="/var/log/fluent/fluentd.log"
    
    # Check if Fluentd process is running
    if ! pgrep -f fluentd > /dev/null; then
        echo "CRITICAL: Fluentd process not running"
        exit 2
    fi
    
    # Check HTTP endpoint
    if ! curl -f -s "$HEALTH_CHECK_URL" > /dev/null; then
        echo "CRITICAL: Monitor agent not responding"
        exit 2
    fi
    
    # Check buffer queue lengths
    BUFFER_SIZE=$(curl -s "$HEALTH_CHECK_URL" | \
        jq '.plugins[] | select(.type=="output") | .buffer_queue_length // 0' | \
        awk '{sum+=$1} END {print sum+0}')
    
    if [ "$BUFFER_SIZE" -gt "$BUFFER_THRESHOLD" ]; then
        echo "WARNING: High buffer queue length: $BUFFER_SIZE"
        exit 1
    fi
    
    # Check for recent errors
    ERROR_COUNT=$(grep -c "ERROR" "$ERROR_LOG" | tail -n 100 || echo 0)
    if [ "$ERROR_COUNT" -gt 5 ]; then
        echo "WARNING: High error count in recent logs: $ERROR_COUNT"
        exit 1
    fi
    
    # Check disk space for buffer directory
    BUFFER_USAGE=$(df /var/log/fluent | awk 'NR==2 {print $5}' | sed 's/%//')
    if [ "$BUFFER_USAGE" -gt 80 ]; then
        echo "WARNING: Buffer disk usage high: ${BUFFER_USAGE}%"
        exit 1
    fi
    
    echo "OK: Fluentd is healthy"
    exit 0
    Bash

    15.5 Troubleshooting Best Practices

    1. Debug Configuration Template

    # Debug configuration for troubleshooting
    <system>
      log_level debug
      suppress_repeated_stacktrace false
      emit_error_log_interval 10s
    </system>
    
    # Add debug outputs
    <match **>
      @type copy
    
      # Your normal output
      <store>
        @type elasticsearch
        index_name app-logs
      </store>
    
      # Debug output (remove in production)
      <store>
        @type stdout
        output_type json
    
        <format>
          @type json
          include_time_key true
          time_key debug_timestamp
        </format>
      </store>
    
      # Count events for debugging
      <store>
        @type prometheus
        <metric>
          name fluentd_debug_events_total
          type counter
          desc Debug event counter
          <labels>
            tag ${tag}
            hostname ${hostname}
          </labels>
        </metric>
      </store>
    </match>
    ANSI

    2. Performance Profiling

    # Add timing information
    <filter **>
      @type record_transformer
      enable_ruby true
    
      <record>
        processing_start_time ${Time.now.to_f}
      </record>
    </filter>
    
    # Measure processing time at output
    <filter **>
      @type record_transformer
      enable_ruby true
    
      <record>
        processing_duration ${
          start_time = record["processing_start_time"].to_f
          Time.now.to_f - start_time
        }
      </record>
    
      remove_keys processing_start_time
    </filter>
    
    # Alert on slow processing
    <filter **>
      @type grep
      <regexp>
        key processing_duration
        pattern ^[1-9]\d*\.  # More than 1 second
      </regexp>
      @label @SLOW_PROCESSING
    </filter>
    
    <label @SLOW_PROCESSING>
      <match **>
        @type slack
        webhook_url "#{ENV['SLACK_WEBHOOK']}"
        channel alerts
        message "Slow log processing detected: %s seconds"
        message_keys processing_duration
      </match>
    </label>
    ANSI

    15.6 Capacity Planning

    1. Resource Estimation

    graph TD
        A[Log Volume Assessment] --> B[Daily Log Volume GB]
        B --> C[Peak Hours Multiplier]
        C --> D[Buffer Size Calculation]
    
        E[Processing Requirements] --> F[Parse Complexity]
        F --> G[Filter Operations]
        G --> H[CPU Requirements]
    
        I[Storage Requirements] --> J[Buffer Storage]
        J --> K[Failed Event Storage]
        K --> L[Disk Space Planning]
    
        M[Network Bandwidth] --> N[Input Bandwidth]
        N --> O[Output Bandwidth]
        O --> P[Network Planning]
    
        style A fill:#e3f2fd
        style E fill:#e8f5e8
        style I fill:#fff3e0
        style M fill:#f3e5f5

    2. Scaling Guidelines

    # Small deployment (< 1GB/day)
    <system>
      workers 1
    </system>
    
    <match **>
      @type elasticsearch
    
      <buffer>
        @type memory
        chunk_limit_size 8MB
        queue_limit_length 32
        flush_thread_count 1
      </buffer>
    </match>
    
    # Medium deployment (1-10GB/day)
    <system>
      workers 2
    </system>
    
    <match **>
      @type elasticsearch
    
      <buffer>
        @type file
        path /var/log/fluent/buffer
        chunk_limit_size 32MB
        queue_limit_length 128
        flush_thread_count 4
      </buffer>
    </match>
    
    # Large deployment (>10GB/day)
    <system>
      workers 4
    </system>
    
    <match **>
      @type elasticsearch
    
      <buffer>
        @type file
        path /var/log/fluent/buffer
        chunk_limit_size 64MB
        queue_limit_length 256
        flush_thread_count 8
      </buffer>
    </match>
    ANSI

    15.7 Migration Best Practices

    1. Gradual Migration Strategy

    # Phase 1: Parallel logging (old + new)
    <match app.**>
      @type copy
    
      # Existing logging system
      <store>
        @type file
        path /var/log/app/existing.log
      </store>
    
      # New Fluentd pipeline
      <store>
        @type elasticsearch
        index_name app-logs-new
      </store>
    </match>
    
    # Phase 2: Add validation
    <filter app.**>
      @type record_transformer
      enable_ruby true
    
      <record>
        migration_phase "parallel_validation"
        fluentd_processed true
      </record>
    </filter>
    
    # Phase 3: Gradual traffic shifting
    <match app.**>
      @type copy
    
      # 90% to new system
      <store>
        @type elasticsearch
        index_name app-logs-new
      </store>
    
      # 10% to old system for comparison
      <store>
        @type sample
        sample_rate 10
    
        @type file
        path /var/log/app/validation.log
      </store>
    </match>
    ANSI

    15.8 Backup and Recovery

    1. Configuration Backup

    #!/bin/bash
    # backup_fluentd_config.sh
    
    BACKUP_DIR="/backup/fluentd"
    CONFIG_DIR="/etc/fluent"
    TIMESTAMP=$(date +%Y%m%d_%H%M%S)
    
    # Create backup directory
    mkdir -p "$BACKUP_DIR"
    
    # Backup configuration
    tar -czf "$BACKUP_DIR/fluentd_config_$TIMESTAMP.tar.gz" \
        -C "$CONFIG_DIR" .
    
    # Backup buffer state
    tar -czf "$BACKUP_DIR/fluentd_buffer_$TIMESTAMP.tar.gz" \
        -C "/var/log/fluent" buffer/
    
    # Keep only last 30 days of backups
    find "$BACKUP_DIR" -name "fluentd_*.tar.gz" -mtime +30 -delete
    
    echo "Backup completed: $TIMESTAMP"
    ANSI

    2. Disaster Recovery Plan

    # Emergency fallback configuration
    <system>
      log_level warn
      suppress_repeated_stacktrace true
    </system>
    
    # Minimal input
    <source>
      @type forward
      port 24224
      bind 0.0.0.0
    </source>
    
    # Local storage fallback
    <match **>
      @type file
      path /var/log/fluent/emergency/events.%Y%m%d_%H.log
    
      <format>
        @type json
      </format>
    
      <buffer time>
        @type file
        path /var/log/fluent/buffer/emergency
        timekey 3600
        timekey_wait 60
        flush_at_shutdown true
      </buffer>
    </match>
    ANSI

    16. Best Practices and Patterns

    16.1 Configuration Organization

    Modular Configuration Structure

    /etc/fluent/
    ├── fluent.conf              # Main entry point
    ├── system.conf              # System-wide settings
    ├── sources/                 # Input configurations
    │   ├── application.conf
    │   ├── system-logs.conf
    │   └── containers.conf
    ├── filters/                 # Processing configurations
    │   ├── enrichment.conf
    │   ├── parsing.conf
    │   └── security.conf
    ├── outputs/                 # Destination configurations
    │   ├── elasticsearch.conf
    │   ├── s3-archive.conf
    │   └── monitoring.conf
    └── templates/              # Reusable configurations
        ├── common-filters.conf
        └── error-handling.conf
    ANSI

    Main Configuration Pattern

    # /etc/fluent/fluent.conf
    <system>
      @include system.conf
    </system>
    
    # Load input configurations
    @include sources/*.conf
    
    # Load processing configurations  
    @include filters/*.conf
    
    # Load output configurations
    @include outputs/*.conf
    
    # Global error handling
    <label @ERROR>
      <match **>
        @type file
        path /var/log/fluent/errors/global.%Y%m%d.log
        <format>
          @type json
        </format>
      </match>
    </label>
    ANSI

    16.2 Performance Patterns

    High-Throughput Pattern

    # For processing >10GB/day
    <system>
      workers 4
      root_dir /var/log/fluent
    </system>
    
    <source>
      @type tail
      path /var/log/app/*.log
      pos_file /var/log/fluent/app.log.pos
      tag app.logs
    
      # Performance optimizations
      read_from_head false
      refresh_interval 60
      rotate_wait 5
      enable_watch_timer false
      enable_stat_watcher true
      open_on_every_update false
      read_lines_limit 1000
    
      <parse>
        @type none  # Parse later if needed
      </parse>
    </source>
    
    <match app.**>
      @type elasticsearch
    
      <buffer>
        @type file
        path /var/log/fluent/buffer/high-throughput
        chunk_limit_size 64MB
        queue_limit_length 512
        flush_mode immediate
        flush_thread_count 8
        overflow_action drop_oldest_chunk
      </buffer>
    </match>
    ANSI

    Low-Latency Pattern

    # For real-time processing with <1s latency
    <source>
      @type forward
      port 24224
      chunk_size_limit 1MB
    </source>
    
    <match realtime.**>
      @type elasticsearch
    
      <buffer>
        @type memory
        chunk_limit_size 1MB
        queue_limit_length 8
        flush_mode immediate
        retry_type exponential_backoff
        retry_wait 0.1s
        retry_max_interval 5s
      </buffer>
    </match>
    ANSI

    Reliability Pattern

    # For critical data that cannot be lost
    <match critical.**>
      @type copy
    
      # Primary output
      <store>
        @type elasticsearch
        host primary-es.example.com
    
        <buffer>
          @type file
          path /var/log/fluent/buffer/critical-primary
          chunk_limit_size 32MB
          queue_limit_length 128
          flush_mode interval
          flush_interval 30s
          flush_at_shutdown true
          retry_forever true
          overflow_action block
        </buffer>
    
        <secondary>
          @type relabel
          @label @BACKUP
        </secondary>
      </store>
    
      # Synchronous backup
      <store>
        @type s3
        s3_bucket critical-backup
        path critical/
    
        <buffer>
          @type file
          path /var/log/fluent/buffer/critical-backup
          flush_mode immediate
        </buffer>
      </store>
    </match>
    
    <label @BACKUP>
      <match **>
        @type s3
        s3_bucket emergency-backup
        path emergency/
      </match>
    </label>
    ANSI

    16.3 Security Patterns

    Data Sanitization Pattern

    <filter sensitive.**>
      @type record_transformer
      enable_ruby true
    
      <record>
        # Remove PII
        message ${
          msg = record["message"].to_s
          # Credit card numbers
          msg = msg.gsub(/\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b/, "****-****-****-****")
          # SSN
          msg = msg.gsub(/\b\d{3}-\d{2}-\d{4}\b/, "***-**-****")
          # Email addresses
          msg = msg.gsub(/\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b/, "[EMAIL_REDACTED]")
          # Phone numbers
          msg = msg.gsub(/\b\d{3}-\d{3}-\d{4}\b/, "***-***-****")
          msg
        }
      </record>
    
      # Remove sensitive fields
      remove_keys password,secret,token,api_key,authorization,session_id
    </filter>
    ANSI

    Encryption in Transit Pattern

    <source>
      @type forward
      port 24224
    
      <transport tls>
        cert_path /etc/fluent/certs/server.crt
        private_key_path /etc/fluent/certs/server.key
        ca_cert_path /etc/fluent/certs/ca.crt
        client_cert_auth true
        ciphers TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256
        insecure false
      </transport>
    
      <security>
        self_hostname fluentd-server
        shared_key "#{ENV['FLUENT_SHARED_KEY']}"
        allow_anonymous_source false
      </security>
    </source>
    ANSI

    16.4 Monitoring Patterns

    Comprehensive Monitoring Pattern

    # Enable all metrics
    <system>
      enable_input_metrics true
      enable_size_metrics true
    
      <metrics>
        @type prometheus
        port 24231
        bind 0.0.0.0
      </metrics>
    </system>
    
    # Application-specific metrics
    <filter app.**>
      @type prometheus
    
      <metric>
        name app_logs_total
        type counter
        desc Total application logs
        <labels>
          service ${record["service"]}
          level ${record["level"]}
          environment ${record["env"]}
        </labels>
      </metric>
    
      <metric>
        name app_response_time_seconds
        type histogram
        desc Application response time
        key response_time
        buckets 0.1,0.5,1.0,5.0,10.0
        <labels>
          service ${record["service"]}
          endpoint ${record["endpoint"]}
        </labels>
      </metric>
    </filter>
    
    # Health check endpoint
    <source>
      @type http
      port 8888
      bind 0.0.0.0
      cors_allow_origins ["*"]
    </source>
    
    <match health.check>
      @type exec
      command /usr/local/bin/health_check.sh
      <format>
        @type json
      </format>
    </match>
    ANSI

    17. Troubleshooting Guide

    17.1 Quick Diagnosis Commands

    # System health
    systemctl status fluentd
    ps aux | grep fluentd
    netstat -tlnp | grep 24224
    
    # Configuration validation
    fluentd -c /etc/fluent/fluent.conf --dry-run
    fluentd --show-plugin-config=input:tail
    
    # Real-time monitoring
    curl http://localhost:24220/api/plugins.json | jq '.'
    tail -f /var/log/fluent/fluentd.log
    
    # Performance analysis
    top -p $(pgrep fluentd)
    iotop -p $(pgrep fluentd)
    lsof -p $(pgrep fluentd)
    Bash

    17.2 Common Error Messages

    Error MessageCauseSolution
    failed to configureSyntax error in configCheck configuration syntax with --dry-run
    buffer overflowOutput slower than inputIncrease buffer size or improve output performance
    connection refusedNetwork connectivity issueCheck network and firewall settings
    certificate verify failedSSL/TLS configurationVerify certificates and CA chain
    permission deniedFile system permissionsCheck file ownership and permissions
    no such file or directoryMissing log filesVerify file paths and permissions

    17.3 Performance Troubleshooting

    Memory Issues

    # Check memory usage patterns
    ps -p $(pgrep fluentd) -o pid,ppid,cmd,%mem,%cpu --sort=-%mem
    
    # Monitor memory over time
    while true; do
      ps -p $(pgrep fluentd) -o %mem --no-headers
      sleep 5
    done
    
    # Check for memory leaks
    valgrind --tool=massif fluentd -c fluent.conf
    Bash

    CPU Issues

    # Check CPU usage
    top -p $(pgrep fluentd)
    
    # Profile CPU usage
    perf record -p $(pgrep fluentd) sleep 30
    perf report
    Bash

    I/O Issues

    # Monitor disk I/O
    iotop -p $(pgrep fluentd)
    
    # Check file descriptors
    lsof -p $(pgrep fluentd) | wc -l
    cat /proc/$(pgrep fluentd)/limits | grep files
    Bash

    18. Plugin Reference

    18.1 Essential Input Plugins

    PluginUse CaseConfiguration Example
    tailFile monitoring@type tail; path /var/log/*.log
    forwardReceiving from other Fluentd@type forward; port 24224
    httpHTTP endpoint@type http; port 9880
    syslogSyslog messages@type syslog; port 5140
    execCommand output@type exec; command "ps aux"
    dummyTesting/development@type dummy; dummy {"hello":"world"}

    18.2 Essential Output Plugins

    PluginUse CaseConfiguration Example
    elasticsearchSearch and analytics@type elasticsearch; host es.example.com
    s3Object storage@type s3; s3_bucket my-logs
    kafkaMessage streaming@type kafka2; brokers kafka:9092
    fileLocal file storage@type file; path /var/log/output.log
    forwardSending to other Fluentd@type forward; host fluentd.example.com
    copyMultiple destinations@type copy; <store>...</store>

    18.3 Essential Filter Plugins

    PluginUse CaseConfiguration Example
    record_transformerField manipulation@type record_transformer; <record>...</record>
    grepEvent filtering@type grep; <regexp>key level; pattern ERROR</regexp>
    parserData parsing@type parser; key_name message; <parse>...</parse>
    geoipIP geolocation@type geoip; geoip_lookup_keys remote_addr
    throttleRate limiting@type throttle; group_bucket_limit 100

    18.4 Parser Plugins

    PluginUse CaseExample
    jsonJSON parsing@type json; json_parser yajl
    regexpRegex parsing@type regexp; expression /(?<field>.*)/
    csvCSV parsing@type csv; keys col1,col2,col3
    apache2Apache logs@type apache2
    nginxNginx logs@type nginx
    syslogSyslog format@type syslog; message_format rfc5424

    19. Configuration Templates

    19.1 Web Application Logging

    # Complete web application logging setup
    <system>
      log_level info
      workers 2
    </system>
    
    # Application logs
    <source>
      @type tail
      path /var/log/myapp/*.log
      pos_file /var/log/fluent/myapp.log.pos
      tag myapp.logs
    
      <parse>
        @type json
        time_key timestamp
        time_format %Y-%m-%d %H:%M:%S %z
      </parse>
    </source>
    
    # Nginx access logs
    <source>
      @type tail
      path /var/log/nginx/access.log
      pos_file /var/log/fluent/nginx.access.log.pos
      tag nginx.access
    
      <parse>
        @type nginx
      </parse>
    </source>
    
    # Nginx error logs  
    <source>
      @type tail
      path /var/log/nginx/error.log
      pos_file /var/log/fluent/nginx.error.log.pos
      tag nginx.error
    
      <parse>
        @type regexp
        expression /^(?<time>\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) \[(?<level>\w+)\] (?<message>.*)$/
        time_format %Y/%m/%d %H:%M:%S
      </parse>
    </source>
    
    # Enrich all logs
    <filter **>
      @type record_transformer
      <record>
        hostname ${hostname}
        environment production
        service myapp
        processed_at ${Time.now.utc.iso8601}
      </record>
    </filter>
    
    # Filter errors for alerting
    <filter *.{error,ERROR}>
      @type record_transformer
      <record>
        alert_required true
        severity high
      </record>
    </filter>
    
    # Send to Elasticsearch
    <match {myapp,nginx}.**>
      @type elasticsearch
      host elasticsearch.example.com
      port 9200
      index_name myapp-logs
    
      <buffer>
        @type file
        path /var/log/fluent/buffer/elasticsearch
        chunk_limit_size 32MB
        queue_limit_length 128
        flush_mode interval
        flush_interval 10s
        retry_type exponential_backoff
        retry_forever true
      </buffer>
    </match>
    
    # Archive to S3
    <match **>
      @type s3
      s3_bucket myapp-logs-archive
      path logs/year=%Y/month=%m/day=%d/hour=%H/
      s3_object_key_format %{path}%{time_slice}_%{index}.%{file_extension}
    
      <format>
        @type json
      </format>
    
      <buffer time>
        @type file
        path /var/log/fluent/buffer/s3
        timekey 3600
        timekey_wait 300
        chunk_limit_size 256MB
      </buffer>
    </match>
    ANSI

    19.2 Kubernetes Logging

    # Kubernetes container logging
    <source>
      @type tail
      path /var/log/containers/*.log
      pos_file /var/log/fluent/containers.log.pos
      tag kubernetes.*
      read_from_head true
    
      <parse>
        @type json
        time_key time
        time_format %Y-%m-%dT%H:%M:%S.%NZ
      </parse>
    </source>
    
    # Add Kubernetes metadata
    <filter kubernetes.**>
      @type kubernetes_metadata
      kubernetes_url https://kubernetes.default.svc.cluster.local
      bearer_token_file /var/run/secrets/kubernetes.io/serviceaccount/token
      ca_file /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
      annotation_match [".*"]
    </filter>
    
    # Route by namespace
    <match kubernetes.var.log.containers.**>
      @type rewrite_tag_filter
    
      <rule>
        key $.kubernetes.namespace_name
        pattern ^(kube-system|kube-public)$
        tag system.${tag}
      </rule>
    
      <rule>
        key $.kubernetes.namespace_name
        pattern ^(.+)$
        tag app.${tag}
      </rule>
    </match>
    
    # System logs to separate index
    <match system.**>
      @type elasticsearch
      host elasticsearch.logging.svc.cluster.local
      index_name k8s-system-logs
    
      <buffer>
        @type file
        path /var/log/fluent/buffer/system
        flush_interval 30s
      </buffer>
    </match>
    
    # Application logs
    <match app.**>
      @type elasticsearch
      host elasticsearch.logging.svc.cluster.local
      index_name k8s-app-logs
    
      <buffer>
        @type file
        path /var/log/fluent/buffer/app
        flush_interval 10s
      </buffer>
    </match>
    ANSI

    19.3 Multi-tenant SaaS Logging

    # Multi-tenant logging with tenant isolation
    <source>
      @type forward
      port 24224
      bind 0.0.0.0
    </source>
    
    # Extract tenant information
    <filter **>
      @type record_transformer
      enable_ruby true
    
      <record>
        tenant_id ${
          # Extract from tag or record
          if tag =~ /^tenant\.([^.]+)\./
            $1
          elsif record.key?("tenant_id")
            record["tenant_id"]
          elsif record.key?("organization_id")
            record["organization_id"]
          else
            "unknown"
          end
        }
    
        # Security: ensure tenant isolation
        tenant_validated ${
          tenant = record["tenant_id"]
          allowed_tenants = ["tenant-a", "tenant-b", "tenant-c"]
          allowed_tenants.include?(tenant) ? tenant : "invalid"
        }
      </record>
    </filter>
    
    # Reject invalid tenants
    <filter **>
      @type grep
      <exclude>
        key tenant_validated
        pattern ^invalid$
      </exclude>
    </filter>
    
    # Route to tenant-specific outputs
    <match **>
      @type rewrite_tag_filter
    
      <rule>
        key tenant_validated
        pattern ^tenant-a$
        tag routed.tenant-a.${tag}
      </rule>
    
      <rule>
        key tenant_validated
        pattern ^tenant-b$
        tag routed.tenant-b.${tag}
      </rule>
    
      <rule>
        key tenant_validated
        pattern ^tenant-c$
        tag routed.tenant-c.${tag}
      </rule>
    </match>
    
    # Tenant A - Premium tier
    <match routed.tenant-a.**>
      @type copy
    
      <store>
        @type elasticsearch
        host tenant-a-es.example.com
        index_name tenant-a-logs
    
        <buffer>
          chunk_limit_size 64MB
          flush_interval 5s
        </buffer>
      </store>
    
      <store>
        @type s3
        s3_bucket tenant-a-archive
        path logs/
      </store>
    </match>
    
    # Tenant B - Standard tier
    <match routed.tenant-b.**>
      @type elasticsearch
      host shared-es.example.com
      index_name tenant-b-logs
    
      <buffer>
        chunk_limit_size 32MB
        flush_interval 30s
      </buffer>
    </match>
    
    # Tenant C - Basic tier
    <match routed.tenant-c.**>
      @type file
      path /var/log/fluent/tenant-c/logs.%Y%m%d.log
    
      <format>
        @type json
      </format>
    
      <buffer time>
        timekey 86400
        timekey_wait 3600
      </buffer>
    </match>
    ANSI

    19.4 High-Availability Production Setup

    # Production HA configuration
    <system>
      workers 4
      root_dir /var/log/fluent
      log_level info
      suppress_repeated_stacktrace true
    </system>
    
    # Secure input from forwarders
    <source>
      @type forward
      port 24224
      bind 0.0.0.0
    
      <transport tls>
        cert_path /etc/fluent/certs/server.crt
        private_key_path /etc/fluent/certs/server.key
        ca_cert_path /etc/fluent/certs/ca.crt
        client_cert_auth true
      </transport>
    
      <security>
        self_hostname #{ENV['HOSTNAME']}
        shared_key "#{ENV['FLUENT_SHARED_KEY']}"
      </security>
    </source>
    
    # Health check endpoint
    <source>
      @type http
      port 8888
      bind 0.0.0.0
    </source>
    
    # Monitoring endpoints
    <source>
      @type monitor_agent
      port 24220
      bind 0.0.0.0
    </source>
    
    <source>
      @type prometheus
      port 24231
      bind 0.0.0.0
    </source>
    
    # Process all application logs
    <match app.**>
      @type copy
    
      # Primary: Real-time search
      <store>
        @type elasticsearch
        hosts ["es1.example.com:9200", "es2.example.com:9200", "es3.example.com:9200"]
        scheme https
        ssl_verify true
    
        index_name app-logs
        logstash_format true
        logstash_prefix app-logs
    
        <buffer>
          @type file
          path /var/log/fluent/buffer/elasticsearch
          chunk_limit_size 32MB
          queue_limit_length 256
          flush_mode interval
          flush_interval 10s
          retry_type exponential_backoff
          retry_forever true
        </buffer>
    
        <secondary>
          @type relabel
          @label @ES_BACKUP
        </secondary>
      </store>
    
      # Secondary: Long-term storage
      <store>
        @type s3
        s3_bucket production-logs
        s3_region us-west-2
    
        path logs/year=%Y/month=%m/day=%d/hour=%H/
        s3_object_key_format %{path}%{time_slice}_%{index}.%{file_extension}
    
        <format>
          @type json
        </format>
    
        <compress>
          @type gzip
        </compress>
    
        <buffer time>
          @type file
          path /var/log/fluent/buffer/s3
          timekey 3600
          timekey_wait 300
          chunk_limit_size 256MB
        </buffer>
      </store>
    
      # Tertiary: Metrics
      <store>
        @type prometheus
        <metric>
          name app_logs_total
          type counter
          desc Total application logs
          <labels>
            service ${record["service"]}
            level ${record["level"]}
            environment production
          </labels>
        </metric>
      </store>
    </match>
    
    # Elasticsearch backup path
    <label @ES_BACKUP>
      <match **>
        @type s3
        s3_bucket emergency-logs
        path elasticsearch-failures/
    
        <format>
          @type json
        </format>
      </match>
    </label>
    
    # Error alerts
    <match error.**>
      @type slack
      webhook_url "#{ENV['SLACK_WEBHOOK_URL']}"
      channel production-alerts
      username fluentd
      icon_emoji :warning:
    
      message "Production Error: %s"
      message_keys message
    
      <buffer>
        flush_mode immediate
        retry_max_times 3
      </buffer>
    </match>
    
    # Fallback for unmatched logs
    <match **>
      @type file
      path /var/log/fluent/unmatched/events.%Y%m%d.log
    
      <format>
        @type json
      </format>
    
      <buffer time>
        timekey 86400
      </buffer>
    </match>
    ANSI

    This comprehensive guide provides practical, production-ready configurations and patterns that can be adapted to various use cases and environments. Each template includes best practices for performance, reliability, and security.


    Discover more from Altgr Blog

    Subscribe to get the latest posts sent to your email.

    Leave a Reply

    Your email address will not be published. Required fields are marked *