A comprehensive guide to mastering Fluentd for unified logging and data collection
Prerequisites and Learning Path
Prerequisites
- Basic understanding of system administration (Linux/Windows)
- Familiarity with log files and logging concepts
- Basic knowledge of JSON and regular expressions
- Understanding of network protocols (HTTP, TCP, UDP)
- Basic command-line skills
Learning Paths
Beginner Path (0-2 weeks)
Complete chapters 1-4, then 6 (basic outputs), 8 (basic routing), and 15 (essential best practices)
Intermediate Path (2-6 weeks)
Follow beginner path, then add chapters 5, 7, 9, 11, and 14
Expert Path (6+ weeks)
Complete all chapters including advanced topics: 10, 12, 13
Table of Contents
Part I: Foundations
- Introduction to Fluentd
- Architecture and Core Concepts
- Installation and Quick Start
- Configuration Fundamentals
Part II: Core Components
- Input Plugins and Data Sources
- Output Plugins and Destinations
- Processing with Filters and Parsers
- Routing and Event Flow
Part III: Advanced Operations
- Buffer Management and Reliability
- Performance Optimization
- Monitoring and Troubleshooting
- Security and Compliance
Part IV: Expert Topics
Part V: Reference
1. Introduction to Fluentd
What is Fluentd?
Fluentd is an open-source data collector that provides a unified logging layer, enabling you to collect, process, and route log data from various sources to multiple destinations. Think of it as a “data pipeline” that sits between your applications and your data storage systems.
Why Fluentd?
Problem it Solves
Modern applications generate logs in different formats, locations, and volumes. Without a unified approach, you end up with:
- Multiple custom scripts for each log source
- Inconsistent log formats and timestamps
- Difficult troubleshooting across services
- Manual log shipping and rotation
Fluentd’s Solution
graph LR
A[Multiple Log Sources] --> B[Fluentd]
B --> C[Unified Processing]
C --> D[Multiple Destinations]
A1[Application Logs] --> B
A2[System Logs] --> B
A3[Database Logs] --> B
A4[Container Logs] --> B
B --> D1[Elasticsearch]
B --> D2[S3]
B --> D3[Kafka]
B --> D4[Monitoring Systems]
style B fill:#f9f,stroke:#333,stroke-width:2pxKey Features
1. Unified Logging
- Collects logs from heterogeneous sources
- Normalizes different log formats
- Provides consistent timestamp handling
- Enables centralized log management
2. Pluggable Architecture
- 500+ community-contributed plugins
- Easy to extend and customize
- Supports virtually any input/output system
- Modular design for specific use cases
3. Reliable Data Delivery
- Built-in buffering mechanisms
- Automatic retry with exponential backoff
- At-least-once delivery guarantee
- Graceful handling of downstream failures
4. High Performance
- Memory-efficient processing
- Optimized for high-throughput scenarios
- Horizontal scaling capabilities
- Low-latency data forwarding
5. JSON-Native Processing
- Native JSON support throughout pipeline
- Schema-less data handling
- Easy field manipulation and enrichment
- Efficient serialization/deserialization
Comparison with Other Tools
Fluentd vs. Logstash
graph TB
subgraph "Fluentd"
F1[Ruby-based]
F2[Lower Memory Usage]
F3[Simpler Configuration]
F4[Better for Forwarding]
end
subgraph "Logstash"
L1[JRuby-based]
L2[Higher Memory Usage]
L3[More Complex Features]
L4[Better for Processing]
end
style F1 fill:#e8f5e8
style L1 fill:#fff3e0| Feature | Fluentd | Logstash | Use When |
|---|---|---|---|
| Memory Usage | Lower (~40MB) | Higher (~200MB) | Resource-constrained environments |
| Configuration | Simple, declarative | More complex, procedural | Simple forwarding vs complex processing |
| Plugin Ecosystem | 500+ plugins | 200+ plugins | Need specific integrations |
| Performance | High throughput | High processing power | Volume vs transformation focus |
Fluentd vs. Filebeat/Fluent Bit
- Fluentd: Full-featured log processor and router
- Filebeat/Fluent Bit: Lightweight log shippers
- Use Together: Filebeat/Fluent Bit → Fluentd → Storage
When to Use Fluentd
✅ Ideal Use Cases
- Multi-service Architectures: Microservices, containers, distributed systems
- Cloud-native Applications: Kubernetes, Docker, serverless functions
- Data Lake Ingestion: Collecting data for analytics and ML pipelines
- Real-time Monitoring: Feeding monitoring and alerting systems
- Compliance Logging: Audit trails, regulatory requirements
- Log Aggregation: Centralizing logs from multiple sources
⚠️ Consider Alternatives When
- Simple Single-source Logging: Basic file rotation might suffice
- Extremely High-volume Streams: Kafka Connect might be better
- Heavy Data Transformation: Logstash might be more suitable
- Resource-critical Environments: Fluent Bit might be better
Real-World Use Cases
Case Study 1: E-commerce Platform
graph TD
A[Web Applications] --> F[Fluentd]
B[Payment Service] --> F
C[Inventory System] --> F
D[User Analytics] --> F
F --> E[Elasticsearch - Search/Debug]
F --> G[S3 - Long-term Storage]
F --> H[Kafka - Real-time Analytics]
F --> I[Slack - Alerts]
style F fill:#f9f,stroke:#333,stroke-width:2pxBenefits Achieved:
- Reduced MTTR from 2 hours to 15 minutes
- Unified view across 20+ microservices
- Real-time fraud detection through log analysis
- Automated compliance reporting
Case Study 2: IoT Data Pipeline
graph LR
A[IoT Devices] --> B[Edge Fluentd]
B --> C[Central Fluentd]
C --> D[Time Series DB]
C --> E[ML Pipeline]
C --> F[Dashboard]Configuration Example:
# Edge processing
<source>
@type mqtt
host sensor-gateway.local
topic sensors/+/data
</source>
<filter sensors.**>
@type record_transformer
<record>
location ${tag_parts[1]}
timestamp ${Time.now.utc.iso8601}
</record>
</filter>
<match sensors.**>
@type forward
<server>
host central-fluentd.example.com
port 24224
</server>
</match>ANSIGetting Started Checklist
Before diving into Fluentd, ensure you have:
- Environment Setup: Linux/Windows/macOS with admin access
- Log Sources Identified: Know what logs you want to collect
- Destinations Planned: Where do you want to send logs?
- Basic Requirements: Understand volume, retention, and performance needs
- Network Access: Ensure connectivity between sources and destinations
Next Steps
Now that you understand what Fluentd is and when to use it, let’s dive into its architecture and core concepts in the next chapter.
2. Architecture and Core Concepts
Fluentd Architecture
graph TD
A[Input Plugins] --> B[Engine]
B --> C[Router]
C --> D[Filter Plugins]
D --> E[Output Plugins]
E --> F[Buffer]
F --> G[Destination]
H[Parser] --> B
I[Formatter] --> E
style B fill:#e1f5fe
style C fill:#f3e5f5
style F fill:#fff3e0Core Components
1. Input Plugins
- Receive or pull data from external sources
- Examples:
tail,forward,http,syslog
2. Parser Plugins
- Parse incoming data into structured format
- Examples:
json,regexp,csv,apache2
3. Filter Plugins
- Modify, enrich, or filter event streams
- Examples:
grep,record_transformer,geoip
4. Output Plugins
- Send processed data to destinations
- Examples:
elasticsearch,s3,kafka,mongodb
5. Formatter Plugins
- Format data before sending to destinations
- Examples:
json,csv,msgpack,ltsv
6. Buffer Plugins
- Improve performance and reliability
- Examples:
memory,file
Event Flow
sequenceDiagram
participant Source
participant Input
participant Engine
participant Filter
participant Output
participant Destination
Source->>Input: Raw Data
Input->>Engine: Parsed Event
Engine->>Filter: Tagged Event
Filter->>Output: Processed Event
Output->>Destination: Formatted DataEvent Structure
Every Fluentd event consists of:
- Tag: Identifier for routing (e.g.,
myapp.access) - Time: Event timestamp
- Record: JSON object containing the actual data
{
"tag": "myapp.access",
"time": 1634567890,
"record": {
"host": "192.168.1.100",
"method": "GET",
"path": "/api/users",
"status": 200,
"response_time": 0.123
}
}JSON3. Installation and Quick Start
3.1 Installation Methods
Method 1: Package Installation (Recommended for Production)
Ubuntu/Debian:
curl -fsSL https://toolbelt.treasuredata.com/sh/install-ubuntu-focal-fluent-package5.sh | sh
sudo systemctl start fluentd
sudo systemctl enable fluentdBashCentOS/RHEL:
curl -fsSL https://toolbelt.treasuredata.com/sh/install-redhat-fluent-package5.sh | sh
sudo systemctl start fluentd
sudo systemctl enable fluentdBashWindows:
# Using Chocolatey
choco install fluentd
# Or download MSI from official websiteBashMethod 2: Docker Installation (Recommended for Development)
# Pull official image
docker pull fluentd:latest
# Create config directory
mkdir -p ./fluentd/etc
# Run with custom config
docker run -d \
--name fluentd \
-p 24224:24224 \
-v $(pwd)/fluentd/etc:/fluentd/etc \
-v /var/log:/var/log:ro \
fluentd:latestBashMethod 3: Gem Installation (Development/Testing)
gem install fluentd
fluentd --setup ./fluent
fluentd -c ./fluent/fluent.conf -vv &Bash3.2 Directory Structure
/etc/fluent/
├── fluent.conf # Main configuration file
├── plugin/ # Custom plugins directory
├── conf.d/ # Additional configuration files
└── certs/ # SSL certificates
/var/log/fluent/
├── buffer/ # Buffer storage
├── failed/ # Failed events
└── fluentd.log # Fluentd process logsBash3.3 Quick Start Tutorial
Let’s create a simple logging pipeline that demonstrates Fluentd’s core concepts.
Step 1: Basic Configuration
Create a basic configuration file:
# fluent.conf
<source>
@type forward
port 24224
bind 0.0.0.0
</source>
<match **>
@type stdout
</match>ANSIStep 2: Test the Setup
# Start Fluentd
fluentd -c fluent.conf -vv
# In another terminal, send a test message
echo '{"message":"Hello Fluentd!","timestamp":"'$(date -Iseconds)'"}' | \
fluent-cat test.logsBashExpected output:
2024-01-01 12:00:00.000000000 +0000 test.logs: {"message":"Hello Fluentd!","timestamp":"2024-01-01T12:00:00+00:00"}BashStep 3: Add File Input
Modify the configuration to read from log files:
# fluent.conf
<source>
@type tail
path /var/log/sample.log
pos_file /var/log/fluent/sample.log.pos
tag sample.logs
format json
</source>
<match sample.logs>
@type stdout
</match>ANSICreate a test log file:
echo '{"level":"INFO","message":"Application started","user_id":123}' >> /var/log/sample.log
echo '{"level":"ERROR","message":"Database connection failed","error_code":500}' >> /var/log/sample.logANSIStep 4: Add Filtering
Add a filter to process only ERROR level logs:
# fluent.conf
<source>
@type tail
path /var/log/sample.log
pos_file /var/log/fluent/sample.log.pos
tag sample.logs
format json
</source>
<filter sample.logs>
@type grep
<regexp>
key level
pattern ERROR
</regexp>
</filter>
<filter sample.logs>
@type record_transformer
<record>
hostname ${hostname}
processed_at ${Time.now.utc.iso8601}
</record>
</filter>
<match sample.logs>
@type stdout
</match>ANSIStep 5: Multiple Outputs
Send logs to both console and file:
# fluent.conf
<source>
@type tail
path /var/log/sample.log
pos_file /var/log/fluent/sample.log.pos
tag sample.logs
format json
</source>
<filter sample.logs>
@type record_transformer
<record>
hostname ${hostname}
processed_at ${Time.now.utc.iso8601}
</record>
</filter>
<match sample.logs>
@type copy
<store>
@type stdout
</store>
<store>
@type file
path /var/log/fluent/processed.log
format json
</store>
</match>ANSI3.4 Hands-on Lab: Complete Logging Pipeline
Objective: Create a complete logging pipeline that collects, processes, and routes logs from a web application.
Scenario: You have a web application that generates access logs and error logs. You want to:
- Collect both types of logs
- Parse and enrich them
- Send access logs to Elasticsearch
- Send error logs to Slack alerts
- Archive all logs to S3
Lab Setup:
- Create sample log files:
# Access logs
mkdir -p /var/log/webapp
cat > /var/log/webapp/access.log << 'EOF'
192.168.1.100 - - [19/Sep/2024:10:00:00 +0000] "GET /api/users HTTP/1.1" 200 1234
192.168.1.101 - - [19/Sep/2024:10:00:01 +0000] "POST /api/login HTTP/1.1" 200 567
192.168.1.102 - - [19/Sep/2024:10:00:02 +0000] "GET /api/orders HTTP/1.1" 404 89
EOF
# Error logs
cat > /var/log/webapp/error.log << 'EOF'
{"timestamp":"2024-09-19T10:00:00Z","level":"ERROR","message":"Database connection timeout","service":"user-api"}
{"timestamp":"2024-09-19T10:00:01Z","level":"WARN","message":"High memory usage detected","service":"order-api"}
{"timestamp":"2024-09-19T10:00:02Z","level":"ERROR","message":"Payment gateway unreachable","service":"payment-api"}
EOFANSI- Create sample log files:
# webapp-logging.conf
<system>
log_level info
</system>
# Collect access logs
<source>
@type tail
path /var/log/webapp/access.log
pos_file /var/log/fluent/access.log.pos
tag webapp.access
<parse>
@type apache2
key_name message
</parse>
</source>
# Collect error logs
<source>
@type tail
path /var/log/webapp/error.log
pos_file /var/log/fluent/error.log.pos
tag webapp.error
<parse>
@type json
</parse>
</source>
# Enrich all logs
<filter webapp.**>
@type record_transformer
<record>
environment production
application webapp
processed_at ${Time.now.utc.iso8601}
</record>
</filter>
# Route access logs to "elasticsearch"
<match webapp.access>
@type file
path /var/log/fluent/elasticsearch.log
append true
<format>
@type json
</format>
</match>
# Route error logs to "slack"
<match webapp.error>
@type file
path /var/log/fluent/slack-alerts.log
append true
<format>
@type json
</format>
</match>ANSI- Run the lab:
# Start Fluentd
fluentd -c webapp-logging.conf -vv
# In another terminal, append new logs
echo '192.168.1.103 - - [19/Sep/2024:10:00:03 +0000] "DELETE /api/users/123 HTTP/1.1" 500 234' >> /var/log/webapp/access.log
echo '{"timestamp":"2024-09-19T10:00:03Z","level":"ERROR","message":"Disk space critical","service":"storage-api"}' >> /var/log/webapp/error.log
# Check outputs
tail -f /var/log/fluent/elasticsearch.log
tail -f /var/log/fluent/slack-alerts.logANSIExpected Results:
- Access logs should appear in `/var/log/fluent/elasticsearch.log` with enriched fields
- Error logs should appear in `/var/log/fluent/slack-alerts.log` with enriched fields
- Both should include the environment, application, and processed_at fields
3.5 Validation and Testing
Configuration Validation
# Dry run to check configuration syntax
fluentd -c fluent.conf --dry-run
# Check for plugin availability
fluentd --show-plugin-config=input:tailBashPerformance Testing
# Generate test data
for i in {1..1000}; do
echo '{"id":'$i',"message":"Test message '$i'","timestamp":"'$(date -Iseconds)'"}' | \
fluent-cat test.performance
done
# Monitor performance
curl http://localhost:24220/api/plugins.json | jq '.plugins[] | select(.type=="input")'Bash3.6 Common Installation Issues
| Issue | Symptoms | Solution |
|---|---|---|
| Permission denied | Cannot write to log directory | sudo chown -R fluentd:fluentd /var/log/fluent |
| Port already in use | Address already in use (bind) | `sudo netstat -tulpn \ |
| Plugin not found | Unknown input plugin ‘xyz’ | fluent-gem install fluent-plugin-xyz |
| Out of memory | Process killed, high memory usage | Reduce buffer sizes in configuration |
Next Steps
Now that you have Fluentd installed and running, let’s dive deeper into configuration fundamentals in the next chapter.
4. Configuration Basics
Configuration File Structure
Fluentd uses a declarative configuration format with the following directives:
# Comments start with #
# Source directive - input plugins
<source>
@type [plugin_name]
# plugin parameters
</source>
# Match directive - output plugins
<match [tag_pattern]>
@type [plugin_name]
# plugin parameters
</match>
# Filter directive - filter plugins
<filter [tag_pattern]>
@type [plugin_name]
# plugin parameters
</filter>
# System directive - global settings
<system>
# system parameters
</system>ANSITag Patterns
graph LR
A["Tag: myapp.web.access"] --> B{Pattern Matching}
B --> C["myapp.**"]
B --> D["*.web.*"]
B --> E["myapp.web.access"]
B --> F["**"]
style A fill:#e3f2fd
style C fill:#e8f5e8
style D fill:#e8f5e8
style E fill:#e8f5e8
style F fill:#e8f5e8Pattern Examples:
*: Matches any single tag part**: Matches zero or more tag parts{app1,app2}: Matches either app1 or app2myapp.{web,api}.**: Matches myapp.web.* or myapp.api.*
Common Parameters
Time Format
time_format %Y-%m-%d %H:%M:%S
time_format_fallbacks %Y-%m-%d %H:%M:%S %z, %Y-%m-%d %H:%M:%SANSIInclude Files
@include /etc/fluent/conf.d/*.confANSIEnvironment Variables
host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"ANSIExample: Complete Web Application Configuration
# fluent.conf
<system>
log_level info
</system>
# Include additional configurations
@include conf.d/*.conf
# Web server access logs
<source>
@type tail
path /var/log/nginx/access.log
pos_file /var/log/fluent/nginx.access.log.pos
tag nginx.access
format nginx
</source>
# Application logs
<source>
@type tail
path /var/log/myapp/*.log
pos_file /var/log/fluent/myapp.log.pos
tag myapp.logs
format json
</source>
# Filter to add hostname
<filter **>
@type record_transformer
<record>
hostname ${hostname}
service_name myapp
</record>
</filter>
# Send to Elasticsearch
<match nginx.access>
@type elasticsearch
host elasticsearch.example.com
port 9200
index_name nginx-access
type_name _doc
</match>
<match myapp.logs>
@type elasticsearch
host elasticsearch.example.com
port 9200
index_name myapp-logs
type_name _doc
</match>ANSI5. Input Plugins
Overview
Input plugins define how Fluentd receives data. They run as separate threads and emit events to the Fluentd engine.
graph TD
A[Log Files] --> B[tail]
C[HTTP Requests] --> D[http]
E[Syslog] --> F[syslog]
G[TCP/UDP] --> H[forward]
I[Docker Containers] --> J[docker]
B --> K[Fluentd Engine]
D --> K
F --> K
H --> K
J --> K
style K fill:#f9f,stroke:#333,stroke-width:2px5.1 Tail Plugin
Most commonly used for reading log files.
<source>
@type tail
path /var/log/nginx/access.log
pos_file /var/log/fluent/nginx.access.log.pos
tag nginx.access
<parse>
@type nginx
key_name message
</parse>
# Advanced options
read_from_head true
follow_inodes true
refresh_interval 60
rotate_wait 5
enable_watch_timer true
enable_stat_watcher true
open_on_every_update false
emit_unmatched_lines true
ignore_repeated_permission_error true
</source>ANSIKey Parameters:
path: File path (supports wildcards)pos_file: Position file to track reading progresstag: Tag assigned to eventsread_from_head: Start reading from beginning of filefollow_inodes: Follow files even after rotation
5.2 Forward Plugin
Receives events from other Fluentd instances.
<source>
@type forward
port 24224
bind 0.0.0.0
# Security options
<security>
self_hostname fluentd-server
shared_key secret_string
</security>
# User authentication
<user>
username alice
password secret_password
</user>
</source>ANSI5.3 HTTP Plugin
Receives events via HTTP.
<source>
@type http
port 9880
bind 0.0.0.0
body_size_limit 32m
keepalive_timeout 10s
# CORS support
cors_allow_origins ["*"]
cors_allow_methods ["POST"]
<parse>
@type json
json_parser yajl
</parse>
# Custom response
<response>
@type json
</response>
</source>ANSIUsage:
curl -X POST -d 'json={"event":"data"}' http://localhost:9880/app.logBash5.4 Syslog Plugin
Receives syslog messages.
<source>
@type syslog
port 5140
bind 0.0.0.0
tag system.logs
protocol_type udp
<parse>
@type syslog
message_format rfc5424
with_priority true
</parse>
</source>ANSI5.5 Docker Plugin
Collects Docker container logs.
<source>
@type docker
docker_url unix:///var/run/docker.sock
# Container filtering
<filter>
container_name_regexp /^app-/
exclude_container_name_regexp /test/
</filter>
<parse>
@type json
</parse>
</source>ANSI5.6 Windows Event Log Plugin
For Windows environments.
<source>
@type windows_eventlog
channels application,system,security
tag windows.eventlog
read_interval 2
<storage>
@type local
persistent true
path C:\fluent\eventlog.json
</storage>
</source>ANSI5.7 Database Input
Reading from databases.
<source>
@type sql
host localhost
port 3306
database myapp
username fluentd
password secret
tag_prefix mysql.myapp
select_interval 60s
select_limit 500
<table>
table users
tag users
update_column updated_at
time_column created_at
</table>
</source>ANSI6. Output Plugins
Overview
Output plugins define where Fluentd sends processed data. They support buffering and retry mechanisms for reliability.
graph TD
A[Fluentd Engine] --> B[Elasticsearch]
A --> C[AWS S3]
A --> D[Kafka]
A --> E[MongoDB]
A --> F[File System]
A --> G[HTTP Endpoint]
A --> H[Email/Slack]
style A fill:#f9f,stroke:#333,stroke-width:2px6.1 Elasticsearch Output
Most popular output for log analytics.
<match app.**>
@type elasticsearch
host elasticsearch.example.com
port 9200
scheme https
user elastic
password changeme
# Index settings
index_name app-logs
type_name _doc
# Dynamic index naming
logstash_format true
logstash_prefix app-logs
logstash_dateformat %Y.%m.%d
# Template management
template_name app-template
template_file /etc/fluent/elasticsearch-template.json
template_overwrite true
# Performance tuning
bulk_message_request_threshold 1024
bulk_message_flush_interval 60s
# Retry and error handling
retry_forever true
retry_max_interval 30
retry_randomize false
# SSL configuration
ssl_verify true
ca_file /etc/ssl/certs/ca-certificates.crt
# Buffer configuration
<buffer>
@type file
path /var/log/fluent/elasticsearch
flush_mode interval
flush_interval 5s
chunk_limit_size 2M
queue_limit_length 8
retry_type exponential_backoff
retry_timeout 72h
</buffer>
</match>ANSI6.2 Amazon S3 Output
For long-term log storage.
<match app.**>
@type s3
# AWS credentials (use IAM roles in production)
aws_key_id YOUR_AWS_KEY_ID
aws_sec_key YOUR_AWS_SECRET_KEY
s3_region us-west-2
s3_bucket my-app-logs
# Path and naming
path logs/year=%Y/month=%m/day=%d/hour=%H/
s3_object_key_format %{path}%{time_slice}_%{index}.%{file_extension}
# File format
<format>
@type json
</format>
# Compression
<compress>
@type gzip
</compress>
# Time slicing
time_slice_format %Y%m%d%H
time_slice_wait 10m
# Buffer settings
<buffer time>
@type file
path /var/log/fluent/s3
timekey 3600
timekey_wait 600
timekey_use_utc true
chunk_limit_size 256m
</buffer>
</match>ANSI6.3 Apache Kafka Output
For real-time data streaming.
<match app.**>
@type kafka2
# Kafka cluster
brokers kafka1:9092,kafka2:9092,kafka3:9092
default_topic app-logs
# Topic routing
<topic_config>
app.web.** web-logs
app.api.** api-logs
app.db.** db-logs
</topic_config>
# Partitioning
default_partition_key_key hostname
partition_key_key user_id
# Message format
<format>
@type json
</format>
# Producer settings
acks all
compression_codec gzip
max_send_retries 3
required_acks 1
# Buffer configuration
<buffer topic>
@type file
path /var/log/fluent/kafka
flush_mode immediate
retry_type exponential_backoff
retry_forever true
</buffer>
</match>ANSI6.4 MongoDB Output
For document-based storage.
<match app.**>
@type mongo
# Connection
host localhost
port 27017
database app_logs
collection logs
# Authentication
username fluentd
password secret
# Connection options
ssl true
ssl_cert /path/to/client.pem
ssl_key /path/to/client.key
ssl_ca_cert /path/to/ca.pem
# Document structure
capped true
capped_size 1GB
capped_max 1000000
# Time handling
time_key timestamp
localtime true
# Buffer settings
<buffer>
@type memory
flush_mode interval
flush_interval 10s
chunk_limit_size 8m
queue_limit_length 16
</buffer>
</match>ANSI6.5 File Output
For local file storage.
<match app.**>
@type file
path /var/log/fluent/app.%Y%m%d
append true
# File format
<format>
@type json
time_key timestamp
time_format %Y-%m-%d %H:%M:%S %z
</format>
# Compression
compress gzip
# Time slicing
<buffer time>
@type file
path /var/log/fluent/buffer/app
timekey 1h
timekey_wait 5m
timekey_use_utc false
</buffer>
</match>ANSI6.6 HTTP Output
For webhook-style integrations.
<match alert.**>
@type http
endpoint http://webhook.example.com/alerts
http_method post
# Headers
headers {"Content-Type":"application/json"}
# Authentication
auth basic
username webhook_user
password webhook_pass
# Format
<format>
@type json
</format>
# Retry configuration
retries 3
retry_wait 1s
retry_max_interval 5s
# SSL
tls_verify_mode peer
tls_ca_cert_path /etc/ssl/certs/ca-certificates.crt
</match>ANSI6.7 Email/Slack Notifications
# Email output
<match critical.**>
@type mail
host smtp.gmail.com
port 587
user your_email@gmail.com
password your_password
enable_starttls_auto true
from your_email@gmail.com
to alerts@company.com
subject Critical Alert: %s
message %s
out_keys tag,time,message
</match>
# Slack output
<match alert.**>
@type slack
webhook_url https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK
channel alerts
username fluentd
icon_emoji :warning:
message "Alert: %s"
message_keys message
color danger
title Alert Details
title_keys tag,hostname
</match>ANSI7. Filter and Parser Plugins
Overview
Filter plugins modify event streams, while parser plugins structure incoming data.
graph LR
A[Raw Event] --> B[Parser Plugin]
B --> C[Structured Event]
C --> D[Filter Plugin]
D --> E[Enhanced Event]
E --> F[Output Plugin]
style B fill:#e8f5e8
style D fill:#fff3e07.1 Parser Plugins
JSON Parser
<source>
@type tail
path /var/log/app.log
tag app.logs
<parse>
@type json
json_parser yajl
time_key timestamp
time_format %Y-%m-%d %H:%M:%S %z
keep_time_key true
</parse>
</source>ANSIRegexp Parser
<source>
@type tail
path /var/log/nginx/access.log
tag nginx.access
<parse>
@type regexp
expression /^(?<remote>[^ ]*) (?<host>[^ ]*) (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^\"]*?)(?: +\S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")?$/
time_format %d/%b/%Y:%H:%M:%S %z
</parse>
</source>ANSIMulti-format Parser
<source>
@type tail
path /var/log/mixed.log
tag mixed.logs
<parse>
@type multi_format
<pattern>
format json
</pattern>
<pattern>
format regexp
expression /^(?<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(?<level>\w+)\] (?<message>.*)$/
time_format %Y-%m-%d %H:%M:%S
</pattern>
<pattern>
format none
</pattern>
</parse>
</source>ANSICSV Parser
<source>
@type tail
path /var/log/data.csv
tag csv.data
<parse>
@type csv
keys id,name,email,created_at
time_key created_at
time_format %Y-%m-%d %H:%M:%S
</parse>
</source>ANSI7.2 Filter Plugins
Record Transformer
Add, modify, or remove fields.
<filter app.**>
@type record_transformer
enable_ruby true
<record>
hostname ${hostname}
tag ${tag}
timestamp ${time}
# Conditional fields
environment ${tag_parts[1] == "prod" ? "production" : "development"}
# Computed fields
response_time_ms ${record["response_time"].to_f * 1000}
# String manipulation
normalized_path ${record["path"].downcase.gsub(/\d+/, 'ID')}
# Add metadata
service_name myapp
version 1.2.3
</record>
# Remove sensitive fields
remove_keys password,secret_key
# Rename fields
rename_keys user:username,addr:ip_address
</filter>ANSIGrep Filter
Filter events based on conditions.
# Include only specific levels
<filter app.**>
@type grep
<regexp>
key level
pattern ^(ERROR|WARN|FATAL)$
</regexp>
</filter>
# Exclude health check requests
<filter nginx.access>
@type grep
<exclude>
key path
pattern ^/health$
</exclude>
</filter>
# Multiple conditions (AND)
<filter app.**>
@type grep
<regexp>
key level
pattern ERROR
</regexp>
<regexp>
key service
pattern web
</regexp>
</filter>
# Multiple conditions (OR)
<filter app.**>
@type grep
<or>
<regexp>
key level
pattern ERROR
</regexp>
<regexp>
key level
pattern FATAL
</regexp>
</or>
</filter>ANSIGeoIP Filter
Add geographical information based on IP addresses.
<filter nginx.access>
@type geoip
# GeoIP database
geoip_database /etc/fluent/GeoLite2-City.mmdb
# Field mapping
<record>
geoip_country ${city.country.iso_code}
geoip_city ${city.city.names.en}
geoip_latitude ${location.latitude}
geoip_longitude ${location.longitude}
geoip_region ${city.subdivisions.0.iso_code}
</record>
# IP field name
geoip_lookup_keys remote_addr,x_forwarded_for
# Skip private IPs
skip_adding_null_record true
</filter>ANSIKubernetes Metadata Filter
Enrich container logs with Kubernetes metadata.
<filter kubernetes.**>
@type kubernetes_metadata
# Kubernetes API
kubernetes_url https://kubernetes.default.svc.cluster.local
bearer_token_file /var/run/secrets/kubernetes.io/serviceaccount/token
ca_file /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
# Metadata to include
annotation_match [".*"]
de_dot false
de_dot_separator _
# Cache settings
cache_size 1000
cache_ttl 3600
</filter>ANSIThrottle Filter
Rate limiting for events.
<filter app.**>
@type throttle
group_key kubernetes.container_name
group_bucket_period_s 60
group_bucket_limit 100
# What to do when throttled
group_drop_logs true
group_warning_delay_s 10
</filter>ANSIDedupe Filter
Remove duplicate events.
<filter app.**>
@type dedupe
key message,level
cache_ttl 300
cache_size 1000
</filter>ANSIConcat Filter
Combine multi-line logs.
<filter app.**>
@type concat
key message
separator ""
# Java stack traces
multiline_start_regexp /^\d{4}-\d{2}-\d{2}/
multiline_end_regexp /^$/
continuous_line_regexp /^\s/
# Stream identity
stream_identity_key container_id
# Timeout for incomplete multiline
flush_interval 5s
timeout_label "@NORMAL"
</filter>ANSI7.3 Advanced Filter Patterns
Conditional Processing
<filter app.**>
@type record_transformer
enable_ruby true
<record>
# Conditional field based on multiple conditions
alert_level ${
if record["status_code"].to_i >= 500
"critical"
elsif record["status_code"].to_i >= 400
"warning"
elsif record["response_time"].to_f > 1.0
"performance"
else
"normal"
end
}
# Dynamic routing tag
routing_tag ${
case record["service"]
when "web"
"app.web.processed"
when "api"
"app.api.processed"
else
"app.unknown.processed"
end
}
</record>
</filter>ANSIData Enrichment Pipeline
# Step 1: Parse user agent
<filter web.access>
@type parser
key_name user_agent
reserve_data true
inject_key_prefix ua_
<parse>
@type user_agent
</parse>
</filter>
# Step 2: Add GeoIP information
<filter web.access>
@type geoip
geoip_lookup_keys remote_addr
<record>
country ${city.country.iso_code}
city ${city.city.names.en}
</record>
</filter>
# Step 3: Classify requests
<filter web.access>
@type record_transformer
enable_ruby true
<record>
request_type ${
if record["path"] =~ /^\/api\//
"api"
elsif record["path"] =~ /\.(css|js|png|jpg|gif)$/
"static"
else
"page"
end
}
is_bot ${record["ua_name"] =~ /(bot|crawler|spider)/i ? true : false}
</record>
</filter>ANSI8. Routing and Tagging
Overview
Fluentd’s routing system uses tags to determine how events flow through the pipeline.
graph TD
A[Input: myapp.web.access] --> B{Router}
B --> |myapp.web.**| C[Web Processing]
B --> |myapp.api.**| D[API Processing]
B --> |myapp.**| E[General Processing]
B --> |**| F[Catch All]
C --> G[Elasticsearch: web-logs]
D --> H[Kafka: api-events]
E --> I[S3: app-logs]
F --> J[File: unknown.log]
style B fill:#f9f,stroke:#333,stroke-width:2px8.1 Tag Structure and Conventions
Hierarchical Tagging
application.component.environment.type
└─ myapp.web.prod.access
└─ myapp.api.staging.error
└─ myapp.db.prod.slow_query
└─ system.nginx.prod.access
└─ security.auth.prod.failed_loginANSIBest Practices for Tag Naming
# Good: Hierarchical and descriptive
myapp.web.access
myapp.api.error
system.nginx.access
security.auth.failure
# Avoid: Flat or unclear naming
logs
web_logs
error_messagesANSI8.2 Routing Patterns
Basic Routing
# Exact match
<match myapp.web.access>
@type elasticsearch
index_name web-access-logs
</match>
# Wildcard match
<match myapp.web.*>
@type file
path /var/log/web/%Y%m%d
</match>
# Multi-level wildcard
<match myapp.**>
@type s3
s3_bucket myapp-logs
</match>
# Multiple patterns
<match {nginx,apache}.access>
@type elasticsearch
index_name web-servers
</match>ANSIComplex Routing Examples
# Route by environment
<match *.prod.**>
@type elasticsearch
host prod-elasticsearch.example.com
index_name prod-logs
</match>
<match *.staging.**>
@type elasticsearch
host staging-elasticsearch.example.com
index_name staging-logs
</match>
# Route by log level
<match **.error>
@type slack
webhook_url https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK
channel alerts
</match>
<match **.{warn,error,fatal}>
@type elasticsearch
index_name error-logs
</match>
# Route by service
<match web.**>
@type kafka
default_topic web-events
</match>
<match api.**>
@type kafka
default_topic api-events
</match>ANSI8.3 Tag Manipulation
Re-tagging Events
<match input.logs>
@type rewrite_tag_filter
<rule>
key level
pattern ERROR
tag error.${tag}
</rule>
<rule>
key level
pattern WARN
tag warning.${tag}
</rule>
<rule>
key level
pattern INFO
tag info.${tag}
</rule>
# Default case
<rule>
key level
pattern .*
tag processed.${tag}
</rule>
</match>
# Process re-tagged events
<match error.**>
@type slack
webhook_url YOUR_WEBHOOK_URL
channel alerts
</match>
<match {warning,info}.**>
@type elasticsearch
index_name application-logs
</match>ANSIDynamic Tagging
<filter app.**>
@type record_transformer
enable_ruby true
<record>
# Add routing information to record
routing_key ${
case record["severity"]
when "high"
"alert"
when "medium"
"warning"
else
"info"
end
}
</record>
</filter>
<match app.**>
@type rewrite_tag_filter
<rule>
key routing_key
pattern alert
tag alert.${tag}
</rule>
<rule>
key routing_key
pattern warning
tag warning.${tag}
</rule>
<rule>
key routing_key
pattern .*
tag processed.${tag}
</rule>
</match>ANSI8.4 Multi-destination Routing
Copy Plugin
Send events to multiple outputs.
<match app.**>
@type copy
# Send to Elasticsearch for searching
<store>
@type elasticsearch
host elasticsearch.example.com
index_name app-logs
</store>
# Send to S3 for archival
<store>
@type s3
s3_bucket app-logs-archive
path logs/%Y/%m/%d/
</store>
# Send critical events to Slack
<store ignore_error>
@type grep
<regexp>
key level
pattern (ERROR|FATAL)
</regexp>
@type slack
webhook_url YOUR_WEBHOOK_URL
channel alerts
</store>
</match>ANSIConditional Copy
<match app.**>
@type copy
# Always send to main storage
<store>
@type elasticsearch
index_name app-logs
</store>
# Only send errors to alerting
<store>
@type grep
<regexp>
key level
pattern ERROR
</regexp>
@type slack
webhook_url YOUR_WEBHOOK_URL
</store>
# Send to different S3 buckets based on environment
<store>
@type grep
<regexp>
key environment
pattern production
</regexp>
@type s3
s3_bucket prod-logs
</store>
<store>
@type grep
<regexp>
key environment
pattern staging
</regexp>
@type s3
s3_bucket staging-logs
</store>
</match>ANSI8.5 Label Routing
Advanced routing using labels for complex flows.
<source>
@type forward
port 24224
@label @mainstream
</source>
<source>
@type tail
path /var/log/secure
tag security.auth
@label @security
</source>
# Main processing pipeline
<label @mainstream>
<filter **>
@type record_transformer
<record>
processed_by mainstream
</record>
</filter>
<match app.**>
@type elasticsearch
index_name app-logs
</match>
<match system.**>
@type file
path /var/log/fluent/system.log
</match>
</label>
# Security-specific processing
<label @security>
<filter **>
@type grep
<regexp>
key message
pattern authentication failure
</regexp>
</filter>
<match **>
@type slack
webhook_url YOUR_WEBHOOK_URL
channel security-alerts
</match>
</label>
# Error handling label
<label @ERROR>
<match **>
@type file
path /var/log/fluent/error.log
</match>
</label>ANSI8.6 Routing Best Practices
1. Consistent Tag Hierarchy
# Use consistent naming conventions
service.component.environment.type
myapp.web.prod.access
myapp.api.prod.error
myapp.db.prod.query
# Avoid mixing conventions
myapp.web.access.prod # Inconsistent
web_prod_access # Flat structureANSI2. Performance Considerations
# Order match directives from specific to general
<match myapp.web.prod.error> # Most specific
@type slack
</match>
<match myapp.web.**> # More general
@type elasticsearch
</match>
<match myapp.**> # General
@type s3
</match>
<match **> # Catch-all (last)
@type file
path /var/log/fluent/unmatched.log
</match>ANSI3. Testing and Debugging Routes
# Add debug output to test routing
<match **>
@type copy
# Your normal output
<store>
@type elasticsearch
index_name app-logs
</store>
# Debug output (remove in production)
<store>
@type stdout
output_type json
</store>
</match>ANSI9. Buffer Management
Overview
Buffering is crucial for Fluentd’s reliability and performance. It handles temporary storage, retry logic, and output delivery guarantees.
graph TB
A[Input Events] --> B[Buffer]
B --> C{Flush Condition Met?}
C -->|Yes| D[Output Plugin]
C -->|No| B
D --> E{Success?}
E -->|Yes| F[Acknowledge]
E -->|No| G[Retry Queue]
G --> H{Max Retries?}
H -->|No| D
H -->|Yes| I[Dead Letter Queue]
style B fill:#e3f2fd
style G fill:#fff3e0
style I fill:#ffebee9.1 Buffer Types
Memory Buffer
Fast but volatile – data is lost on restart.
<match app.**>
@type elasticsearch
<buffer>
@type memory
chunk_limit_size 8MB
queue_limit_length 16
flush_mode interval
flush_interval 5s
overflow_action throw_exception
</buffer>
</match>ANSIFile Buffer
Persistent but slower – data survives restarts.
<match app.**>
@type elasticsearch
<buffer>
@type file
path /var/log/fluent/buffer/elasticsearch
chunk_limit_size 32MB
queue_limit_length 64
flush_mode interval
flush_interval 10s
overflow_action block
# File-specific settings
file_permission 0644
dir_permission 0755
</buffer>
</match>ANSI9.2 Buffer Parameters
Core Buffer Settings
<buffer>
# Buffer type
@type file
path /var/log/fluent/buffer/app
# Chunk settings
chunk_limit_size 32MB # Max size per chunk
chunk_limit_records 1000 # Max records per chunk
# Queue settings
queue_limit_length 128 # Max chunks in queue
queued_chunks_limit_size 1GB # Max total queue size
# Flush settings
flush_mode interval # interval, immediate, lazy
flush_interval 10s # How often to flush
flush_at_shutdown true # Flush on shutdown
# Threading
flush_thread_count 2 # Parallel flush threads
# Overflow behavior
overflow_action block # block, throw_exception, drop_oldest_chunk
# Retry settings
retry_type exponential_backoff # exponential_backoff, periodic
retry_wait 1s # Initial retry wait
retry_max_interval 60s # Max retry interval
retry_max_times 17 # Max retry attempts
retry_forever false # Retry indefinitely
retry_timeout 72h # Give up after this time
# Secondary output on failure
retry_secondary_threshold 0.8 # When to try secondary
</buffer>ANSITime-based Buffering
<match app.**>
@type s3
<buffer time>
@type file
path /var/log/fluent/buffer/s3
# Time-based chunking
timekey 1h # 1 hour chunks
timekey_wait 5m # Wait 5min for late events
timekey_use_utc true # Use UTC timestamps
timekey_zone +0900 # Timezone for timekey
# Time boundaries
flush_mode interval
flush_interval 10m
</buffer>
</match>ANSITag-based Buffering
<match app.**>
@type elasticsearch
<buffer tag>
@type file
path /var/log/fluent/buffer/elasticsearch
# Each tag gets separate chunks
chunk_limit_size 16MB
flush_mode immediate
</buffer>
</match>ANSICustom Key Buffering
<match app.**>
@type kafka
<buffer hostname,level>
@type memory
# Buffer by hostname and log level
chunk_limit_size 8MB
flush_mode interval
flush_interval 30s
</buffer>
</match>ANSI9.3 Buffer Monitoring
Buffer Metrics
Fluentd exposes buffer metrics for monitoring:
<system>
enable_input_metrics true
enable_size_metrics true
</system>
# Monitor buffer metrics
<source>
@type monitor_agent
bind 0.0.0.0
port 24220
include_config false
</source>
# Export metrics to Prometheus
<source>
@type prometheus
bind 0.0.0.0
port 24231
metrics_path /metrics
</source>
<source>
@type prometheus_output_monitor
interval 10
<labels>
hostname ${hostname}
</labels>
</source>ANSIKey metrics to monitor:
buffer_queue_length: Number of chunks in queuebuffer_total_queued_size: Total size of queued dataretry_count: Number of retriesemit_count: Number of events emittedemit_size: Size of emitted data
9.4 Advanced Buffer Configurations
High-throughput Configuration
<match app.**>
@type elasticsearch
<buffer>
@type file
path /var/log/fluent/buffer/high-throughput
# Large chunks for efficiency
chunk_limit_size 64MB
chunk_limit_records 100000
# More parallel processing
flush_thread_count 4
queue_limit_length 256
# Aggressive flushing
flush_mode immediate
flush_interval 1s
# Handle overflow
overflow_action drop_oldest_chunk
</buffer>
</match>ANSILow-latency Configuration
<match app.**>
@type elasticsearch
<buffer>
@type memory
# Small chunks for low latency
chunk_limit_size 1MB
chunk_limit_records 1000
# Immediate flushing
flush_mode immediate
# Minimal queuing
queue_limit_length 8
# Fast retries
retry_type exponential_backoff
retry_wait 0.1s
retry_max_interval 5s
</buffer>
</match>ANSIReliable Configuration
<match critical.**>
@type elasticsearch
<buffer>
@type file
path /var/log/fluent/buffer/critical
# Persistent storage
file_permission 0600
dir_permission 0700
# Conservative settings
chunk_limit_size 8MB
queue_limit_length 32
# Reliable flushing
flush_mode interval
flush_interval 30s
flush_at_shutdown true
# Extensive retries
retry_forever true
retry_max_interval 300s
# Block on overflow (don't lose data)
overflow_action block
</buffer>
# Secondary output for failures
<secondary>
@type file
path /var/log/fluent/failed/critical.%Y%m%d.log
<format>
@type json
</format>
</secondary>
</match>ANSI9.5 Buffer Troubleshooting
Common Buffer Issues
- Buffer Overflow
# Symptoms: "buffer overflow" errors
# Solutions:
<buffer>
# Increase buffer size
queue_limit_length 256
chunk_limit_size 64MB
# Handle overflow gracefully
overflow_action drop_oldest_chunk
# Increase flush rate
flush_interval 5s
flush_thread_count 4
</buffer>ANSI- Slow Draining
# Symptoms: Growing buffer queue
# Solutions:
<buffer>
# More parallel processing
flush_thread_count 8
# Larger chunks (fewer requests)
chunk_limit_size 32MB
# Immediate mode for faster processing
flush_mode immediate
</buffer>ANSI- Memory Issues
# Symptoms: High memory usage
# Solutions:
<buffer>
# Use file buffer instead of memory
@type file
path /var/log/fluent/buffer
# Limit memory usage
chunk_limit_size 8MB
queue_limit_length 16
# Flush more frequently
flush_interval 10s
</buffer>ANSIBuffer Debugging
# Enable debug logging
<system>
log_level debug
</system>
# Monitor buffer status
<source>
@type monitor_agent
bind 0.0.0.0
port 24220
</source>
# Add buffer metrics to logs
<filter **>
@type record_transformer
<record>
buffer_queue_length "#{Fluent::Plugin.new_buffer('memory').queue.length}"
</record>
</filter>ANSI9.6 Buffer Best Practices
1. Choose Appropriate Buffer Type
# Use memory buffer for:
# - High-performance scenarios
# - Non-critical data
# - Development environments
# Use file buffer for:
# - Production environments
# - Critical data that cannot be lost
# - When restart durability is requiredANSI2. Size Buffers Appropriately
# Consider your data volume and patterns
<buffer>
# For high-volume logs (>1GB/day)
chunk_limit_size 32MB
queue_limit_length 128
# For low-volume logs (<100MB/day)
chunk_limit_size 8MB
queue_limit_length 32
</buffer>ANSI3. Monitor Buffer Health
# Set up alerts for:
# - Buffer queue length > 80% of limit
# - Retry count increasing
# - Buffer overflow events
# - Flush failuresANSI4. Plan for Failure Scenarios
<match critical.**>
@type elasticsearch
<buffer>
# Primary buffer configuration
@type file
path /var/log/fluent/buffer/critical
</buffer>
# Secondary output for failures
<secondary>
@type file
path /var/log/fluent/failed/critical.log
</secondary>
</match>ANSI10. Performance Optimization
Overview
Optimizing Fluentd performance involves tuning multiple components: inputs, parsing, filtering, buffering, and outputs.
graph TD
A[Performance Bottlenecks] --> B[CPU Usage]
A --> C[Memory Usage]
A --> D[I/O Operations]
A --> E[Network Latency]
B --> F[Parser Optimization]
B --> G[Filter Reduction]
C --> H[Buffer Tuning]
C --> I[Memory Management]
D --> J[File Operations]
D --> K[Disk Storage]
E --> L[Batch Processing]
E --> M[Connection Pooling]
style A fill:#ffebee
style B fill:#e8f5e8
style C fill:#e3f2fd
style D fill:#fff3e0
style E fill:#f3e5f510.1 System-level Optimization
Operating System Tuning
# Increase file descriptor limits
echo "fluentd soft nofile 65536" >> /etc/security/limits.conf
echo "fluentd hard nofile 65536" >> /etc/security/limits.conf
# TCP buffer tuning
echo 'net.core.rmem_max = 134217728' >> /etc/sysctl.conf
echo 'net.core.wmem_max = 134217728' >> /etc/sysctl.conf
echo 'net.ipv4.tcp_rmem = 4096 65536 134217728' >> /etc/sysctl.conf
echo 'net.ipv4.tcp_wmem = 4096 65536 134217728' >> /etc/sysctl.conf
# Virtual memory settings
echo 'vm.max_map_count = 262144' >> /etc/sysctl.conf
echo 'vm.swappiness = 1' >> /etc/sysctl.conf
# Apply changes
sysctl -pANSIJVM Tuning (for JRuby)
export JAVA_OPTS="-Xmx2g -Xms2g -XX:+UseG1GC -XX:MaxGCPauseMillis=200"ANSIProcess and Thread Limits
<system>
# Worker processes
workers 4
# Root directory for workers
root_dir /var/log/fluent
# Log settings
log_level info
suppress_repeated_stacktrace true
# Without sudo
without_source true
</system>ANSI10.2 Input Optimization
Tail Plugin Optimization
<source>
@type tail
path /var/log/app/*.log
pos_file /var/log/fluent/app.log.pos
tag app.logs
# Performance tuning
read_from_head false
refresh_interval 60
rotate_wait 5
enable_watch_timer false # Disable for high-volume logs
enable_stat_watcher true # Use for better inode tracking
open_on_every_update false # Keep files open
# Reduce parsing overhead
<parse>
@type none # Skip parsing if not needed
</parse>
# Multi-line handling
read_lines_limit 1000 # Process more lines per iteration
read_bytes_limit_per_second -1 # No rate limiting
</source>ANSIForward Plugin Optimization
<source>
@type forward
port 24224
bind 0.0.0.0
# Connection settings
linger_timeout 0
chunk_size_limit 1MB
chunk_size_warn_limit 1MB
# Skip hostname resolution
resolve_hostname false
# Disable access logging for performance
deny_keepalive false
</source>ANSIHTTP Plugin Optimization
<source>
@type http
port 9880
bind 0.0.0.0
# Performance settings
body_size_limit 32m
keepalive_timeout 10s
# Disable CORS if not needed
cors_allow_origins []
# Use efficient parser
<parse>
@type json
json_parser oj # Faster JSON parser
</parse>
</source>ANSI10.3 Parser Optimization
Choose Efficient Parsers
# Fast: Pre-structured formats
<parse>
@type json
json_parser oj # Fastest JSON parser
</parse>
# Medium: Simple regex
<parse>
@type regexp
expression /^(?<time>\S+) (?<level>\S+) (?<message>.*)$/
time_format %Y-%m-%dT%H:%M:%S%z
</parse>
# Slow: Complex regex or multi-format
<parse>
@type multi_format # Avoid if possible
<pattern>
format json
</pattern>
<pattern>
format apache2
</pattern>
</parse>ANSIOptimize Regular Expressions
# Good: Specific and anchored
expression /^(?<timestamp>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z) \[(?<level>\w+)\] (?<message>.+)$/
# Bad: Greedy and unanchored
expression /.*(\d{4}-\d{2}-\d{2}.*) \[(.*)\] (.*)$/
# Better: Non-greedy when needed
expression /^(?<timestamp>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z) \[(?<level>\w+?)\] (?<message>.+)$/ANSISkip Unnecessary Parsing
# Parse only when needed
<source>
@type tail
path /var/log/app.log
tag app.raw
<parse>
@type none # Don't parse at input
</parse>
</source>
# Parse later, only for specific outputs
<filter app.raw>
@type parser
key_name message
reserve_data true
<parse>
@type json
</parse>
</filter>ANSI10.4 Filter Optimization
Minimize Filter Usage
# Bad: Multiple separate filters
<filter app.**>
@type record_transformer
<record>
hostname ${hostname}
</record>
</filter>
<filter app.**>
@type record_transformer
<record>
timestamp ${time}
</record>
</filter>
# Good: Single combined filter
<filter app.**>
@type record_transformer
<record>
hostname ${hostname}
timestamp ${time}
</record>
</filter>ANSIOptimize Record Transformer
<filter app.**>
@type record_transformer
enable_ruby true
# Cache expensive operations
<record>
# Good: Simple operations
hostname ${hostname}
tag ${tag}
# Avoid: Complex computations in each event
# slow_operation ${record["data"].to_json.length}
# Better: Pre-compute when possible
service_name myapp
version 1.0.0
</record>
# Remove unnecessary fields early
remove_keys password,secret,internal_field
</filter>ANSIUse Grep for Early Filtering
# Filter out unnecessary events early
<filter app.**>
@type grep
<exclude>
key path
pattern ^/health$
</exclude>
</filter>
# This reduces load on downstream filters and outputs
<filter app.**>
@type record_transformer
# ... expensive operations
</filter>ANSI10.5 Buffer Optimization
High-throughput Buffer Configuration
<match app.**>
@type elasticsearch
<buffer>
@type file
path /var/log/fluent/buffer/high-throughput
# Large chunks for fewer requests
chunk_limit_size 64MB
chunk_limit_records 100000
# Parallel processing
flush_thread_count 8
# Large queue
queue_limit_length 512
queued_chunks_limit_size 2GB
# Immediate flushing
flush_mode immediate
# Handle overflow
overflow_action drop_oldest_chunk
# Optimized retries
retry_type exponential_backoff
retry_wait 1s
retry_max_interval 30s
retry_max_times 5
</buffer>
</match>ANSIMemory vs File Buffer Trade-offs
# High-performance, low-durability
<buffer>
@type memory
chunk_limit_size 16MB
queue_limit_length 64
flush_mode immediate
</buffer>
# Balanced performance and durability
<buffer>
@type file
path /var/log/fluent/buffer
chunk_limit_size 32MB
queue_limit_length 128
flush_mode interval
flush_interval 5s
</buffer>
# High-durability, lower performance
<buffer>
@type file
path /var/log/fluent/buffer
chunk_limit_size 8MB
queue_limit_length 32
flush_mode interval
flush_interval 30s
flush_at_shutdown true
</buffer>ANSI10.6 Output Optimization
Elasticsearch Optimization
<match app.**>
@type elasticsearch
host elasticsearch.example.com
port 9200
# Bulk settings for performance
bulk_message_request_threshold 5000
bulk_message_flush_interval 5s
# Connection optimization
reload_connections false
reload_after 100
# Disable expensive operations
include_tag_key false
include_timestamp false
# Template optimization
template_overwrite false
# Use efficient formatting
<format>
@type json
</format>
<buffer>
@type file
path /var/log/fluent/buffer/elasticsearch
chunk_limit_size 32MB
flush_thread_count 4
flush_mode immediate
</buffer>
</match>ANSIKafka Optimization
<match app.**>
@type kafka2
# Broker settings
brokers kafka1:9092,kafka2:9092,kafka3:9092
default_topic app-logs
# Producer optimization
acks 1 # Don't wait for all replicas
compression_codec snappy # Fast compression
batch_size 16384 # Larger batches
linger_ms 5 # Small delay for batching
# Partitioning
partition_key_key hostname # Distribute load
<buffer>
@type memory
chunk_limit_size 16MB
flush_mode immediate
flush_thread_count 4
</buffer>
</match>ANSIS3 Optimization
<match app.**>
@type s3
aws_key_id YOUR_KEY
aws_sec_key YOUR_SECRET
s3_region us-west-2
s3_bucket app-logs
# Large objects for efficiency
s3_object_key_format %{path}%{time_slice}_%{index}.%{file_extension}
# Compression
<compress>
@type gzip
</compress>
# Storage class
storage_class STANDARD_IA # Cheaper for infrequent access
<buffer time>
@type file
path /var/log/fluent/buffer/s3
timekey 3600 # 1 hour chunks
timekey_wait 300 # 5 minute wait
chunk_limit_size 256MB # Large chunks
flush_thread_count 2
</buffer>
</match>ANSI// …existing code…
10.7 Monitoring Performance
Performance Metrics
# Enable metrics collection
<system>
enable_input_metrics true
enable_size_metrics true
</system>
# Monitor agent
<source>
@type monitor_agent
bind 0.0.0.0
port 24220
include_config false
</source>
# Prometheus metrics
<source>
@type prometheus
bind 0.0.0.0
port 24231
metrics_path /metrics
</source>
<source>
@type prometheus_output_monitor
interval 10
<labels>
hostname ${hostname}
service fluentd
</labels>
</source>ANSIKey Performance Indicators
graph TB
A[Performance Metrics] --> B[Throughput]
A --> C[Latency]
A --> D[Resource Usage]
A --> E[Error Rates]
B --> F[Events/sec]
B --> G[Bytes/sec]
C --> H[Processing Time]
C --> I[Buffer Wait Time]
D --> J[CPU Usage]
D --> K[Memory Usage]
D --> L[Disk I/O]
E --> M[Parse Errors]
E --> N[Output Failures]
style A fill:#e3f2fd
style B fill:#e8f5e8
style C fill:#fff3e0
style D fill:#f3e5f5
style E fill:#ffebee10.8 Performance Best Practices
1. Profile Your Configuration
# Use Fluentd's profiling tools
fluentd -c fluent.conf --dry-run
fluentd -c fluent.conf -vv --log-level debug
# Monitor system resources
top -p $(pgrep fluentd)
iotop -p $(pgrep fluentd)Bash2. Optimize Data Flow
# Bad: Inefficient pipeline
<source>
@type tail
path /var/log/app.log
tag app.logs
<parse>
@type json
</parse>
</source>
<filter app.logs>
@type grep
<regexp>
key level
pattern ERROR
</regexp>
</filter>
<filter app.logs>
@type record_transformer
<record>
hostname ${hostname}
</record>
</filter>
# Good: Optimized pipeline
<source>
@type tail
path /var/log/app.log
tag app.logs
<parse>
@type json
</parse>
</source>
<filter app.logs>
@type grep
<regexp>
key level
pattern ERROR
</regexp>
</filter>
<filter app.logs>
@type record_transformer
<record>
hostname ${hostname}
</record>
</filter>ANSI11. Monitoring and Troubleshooting
Overview
Effective monitoring and troubleshooting are essential for maintaining a healthy Fluentd deployment.
graph TD
A[Monitoring Strategy] --> B[System Metrics]
A --> C[Application Metrics]
A --> D[Log Analysis]
A --> E[Alerting]
B --> F[CPU/Memory/Disk]
B --> G[Network I/O]
C --> H[Buffer Status]
C --> I[Event Throughput]
C --> J[Error Rates]
D --> K[Fluentd Logs]
D --> L[Output Validation]
E --> M[Threshold Alerts]
E --> N[Anomaly Detection]
style A fill:#e3f2fd
style B fill:#e8f5e8
style C fill:#fff3e0
style D fill:#f3e5f5
style E fill:#ffebee11.1 Built-in Monitoring
Monitor Agent Plugin
<source>
@type monitor_agent
bind 0.0.0.0
port 24220
include_config false
include_retry false
</source>ANSIQuery monitor agent:
# Get plugin information
curl http://localhost:24220/api/plugins.json
# Get specific plugin info
curl http://localhost:24220/api/plugins/object:3fd8d4d4d638.json
# Get config dump
curl http://localhost:24220/api/config.jsonBashDebug Event Dump
<source>
@type debug_agent
bind 0.0.0.0
port 24230
</source>ANSI11.2 Metrics Collection
Prometheus Integration
# Install prometheus plugin
gem install fluent-plugin-prometheus
# Configuration
<source>
@type prometheus
bind 0.0.0.0
port 24231
metrics_path /metrics
</source>
<source>
@type prometheus_monitor
interval 10
<labels>
hostname ${hostname}
service fluentd
</labels>
</source>
<source>
@type prometheus_output_monitor
interval 10
<labels>
hostname ${hostname}
</labels>
</source>
<source>
@type prometheus_tail_monitor
interval 10
<labels>
hostname ${hostname}
</labels>
</source>ANSICustom Metrics
<filter app.**>
@type prometheus
<metric>
name fluentd_input_status_num_records_total
type counter
desc The total number of incoming records
<labels>
tag ${tag}
hostname ${hostname}
</labels>
</metric>
<metric>
name fluentd_input_status_num_bytes_total
type counter
desc The total bytes of incoming records
key size
<labels>
tag ${tag}
hostname ${hostname}
</labels>
</metric>
</filter>ANSI11.3 Health Checks
HTTP Health Check Endpoint
<source>
@type http
port 9880
bind 0.0.0.0
<transport tls>
cert_path /path/to/cert.pem
private_key_path /path/to/key.pem
</transport>
</source>
# Add health check route
<match fluentd.health>
@type null
</match>ANSIHealth check script:
#!/bin/bash
# health_check.sh
# Check if Fluentd is running
if ! pgrep -f fluentd > /dev/null; then
echo "CRITICAL: Fluentd process not running"
exit 2
fi
# Check HTTP endpoint
if ! curl -f http://localhost:24220/api/plugins.json > /dev/null 2>&1; then
echo "CRITICAL: Monitor agent not responding"
exit 2
fi
# Check buffer sizes
BUFFER_SIZE=$(curl -s http://localhost:24220/api/plugins.json | jq '.plugins[] | select(.type=="output") | .buffer_queue_length' | awk '{sum+=$1} END {print sum}')
if [ "$BUFFER_SIZE" -gt 1000 ]; then
echo "WARNING: High buffer queue length: $BUFFER_SIZE"
exit 1
fi
echo "OK: Fluentd is healthy"
exit 0Bash11.4 Log Analysis
Fluentd Internal Logging
<system>
log_level info
# Separate log files
<log>
format json
time_format %Y-%m-%d %H:%M:%S %z
</log>
# Suppress repeated stack traces
suppress_repeated_stacktrace true
emit_error_log_interval 30s
suppress_config_dump true
# Log rotation
<log>
rotate_age 7
rotate_size 10MB
</log>
</system>ANSIImportant Log Patterns
# Monitor for these patterns in Fluentd logs:
# Buffer overflow
grep "buffer overflow" /var/log/fluent/fluentd.log
# Parse failures
grep "parse failed" /var/log/fluent/fluentd.log
# Output failures
grep "failed to flush" /var/log/fluent/fluentd.log
# Connection issues
grep "connection refused\|timeout" /var/log/fluent/fluentd.log
# Memory issues
grep "out of memory\|memory exhausted" /var/log/fluent/fluentd.logBash11.5 Common Issues and Solutions
Issue 1: High Memory Usage
graph TD
A[High Memory Usage] --> B{Check Buffer Type}
B -->|Memory Buffer| C[Switch to File Buffer]
B -->|File Buffer| D[Check Buffer Size]
D --> E[Reduce chunk_limit_size]
D --> F[Reduce queue_limit_length]
C --> G[Monitor Memory]
E --> G
F --> G
style A fill:#ffebee
style G fill:#e8f5e8Symptoms:
- Fluentd process consuming excessive RAM
- System becoming unresponsive
- Out of memory errors in logs
Diagnosis:
# Check memory usage
ps aux | grep fluentd
top -p $(pgrep fluentd)
# Check buffer statistics
curl http://localhost:24220/api/plugins.json | jq '.plugins[] | select(.buffer_queue_length > 0)'BashSolution:
# Replace memory buffer with file buffer
<match app.**>
@type elasticsearch
<buffer>
@type file
path /var/log/fluent/buffer
chunk_limit_size 8MB # Reduce from 32MB
queue_limit_length 32 # Reduce from 128
flush_mode interval
flush_interval 10s
</buffer>
</match>ANSIIssue 2: Buffer Overflow
Symptoms:
- “buffer space has too many data” errors
- “buffer overflow” in logs
- Events being dropped
Root Causes:
- Downstream system too slow
- Network connectivity issues
- Insufficient buffer capacity
Diagnosis:
# Check buffer queue status
curl -s http://localhost:24220/api/plugins.json | \
jq '.plugins[] | select(.type=="output") | {plugin_id, buffer_queue_length, buffer_total_queued_size}'
# Monitor buffer growth
watch -n 5 'curl -s http://localhost:24220/api/plugins.json | jq ".plugins[] | select(.type==\"output\") | .buffer_queue_length"'BashSolutions:
# Immediate fixes
<match app.**>
@type elasticsearch
<buffer>
# Increase buffer capacity
queue_limit_length 256 # Increase queue size
chunk_limit_size 32MB # Larger chunks
# Faster flushing
flush_mode immediate
flush_thread_count 4 # More parallel threads
# Handle overflow gracefully
overflow_action drop_oldest_chunk
</buffer>
</match>
# Long-term solutions
<match app.**>
@type copy
# Primary output with circuit breaker
<store>
@type elasticsearch
host primary-es.example.com
<buffer>
retry_max_times 3
retry_timeout 30s
</buffer>
<secondary>
@type relabel
@label @FALLBACK
</secondary>
</store>
</match>
<label @FALLBACK>
<match **>
@type s3
s3_bucket backup-logs
path fallback/
</match>
</label>ANSIIssue 3: Parse Failures
Symptoms:
- “failed to parse” errors
- Missing fields in output
- Events with unexpected format
Common Scenarios:
- JSON Parse Errors:
# Problem: Malformed JSON
# Input: {"message": "Hello world", "level": INFO} # Missing quotes
# Solution: Use multiformat parser
<source>
@type tail
path /var/log/app.log
tag app.logs
<parse>
@type multi_format
<pattern>
format json
</pattern>
<pattern>
format regexp
expression /^(?<message>.*)$/
</pattern>
</parse>
</source>ANSI- Regex Parse Errors:
# Problem: Regex doesn't match log format
# Solution: Test and fix regex
# Debug regex pattern
<source>
@type tail
path /var/log/nginx/access.log
tag nginx.access
<parse>
@type regexp
# Test with online regex tools first
expression /^(?<remote>[^ ]*) (?<host>[^ ]*) (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^\"]*?)(?: +\S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*)/
time_format %d/%b/%Y:%H:%M:%S %z
</parse>
</source>ANSI- Handle Parse Errors:
<source>
@type tail
path /var/log/app.log
tag app.logs
<parse>
@type json
# Send unparseable lines to error stream
emit_invalid_record_to_error true
</parse>
</source>
# Handle parse errors
<label @ERROR>
<match **>
@type file
path /var/log/fluent/parse_errors.log
<format>
@type json
</format>
</match>
</label>ANSIIssue 4: Output Failures
Symptoms:
- “failed to flush” errors
- Events not reaching destination
- Retries exhausted
Elasticsearch Connection Issues:
# Problem: Connection refused, timeout
# Diagnosis commands:
# curl -X GET "elasticsearch.example.com:9200/_cluster/health"
# telnet elasticsearch.example.com 9200
<match app.**>
@type elasticsearch
host elasticsearch.example.com
port 9200
# Connection tuning
request_timeout 60s
reload_connections true
reload_after 500
# Retry configuration
<buffer>
retry_type exponential_backoff
retry_wait 1s
retry_max_interval 60s
retry_max_times 10
retry_forever false
retry_timeout 24h
</buffer>
# Secondary output for failures
<secondary>
@type file
path /var/log/fluent/failed_elasticsearch.log
<format>
@type json
</format>
</secondary>
</match>ANSIIssue 5: Performance Degradation
Symptoms:
- High CPU usage
- Slow log processing
- Growing buffer queues
Performance Tuning Checklist:
- Input Optimization:
<source>
@type tail
path /var/log/app/*.log
# Optimize for performance
read_from_head false
refresh_interval 60
rotate_wait 5
enable_watch_timer false # Disable for high-volume
enable_stat_watcher true # Better inode tracking
open_on_every_update false # Keep files open
read_lines_limit 1000 # Process more lines per batch
</source>ANSI- Filter Optimization:
# Bad: Multiple separate filters
<filter app.**>
@type record_transformer
<record>
hostname ${hostname}
</record>
</filter>
<filter app.**>
@type record_transformer
<record>
timestamp ${time}
</record>
</filter>
# Good: Combined filter
<filter app.**>
@type record_transformer
<record>
hostname ${hostname}
timestamp ${time}
</record>
</filter>ANSI3. Output Optimization:
<match app.**>
@type elasticsearch
# Bulk settings
bulk_message_request_threshold 5000
bulk_message_flush_interval 5s
# Buffer tuning
<buffer>
@type file
chunk_limit_size 32MB # Larger chunks
flush_thread_count 4 # Parallel processing
flush_mode immediate # Faster processing
</buffer>
</match>ANSIIssue 6: SSL/TLS Certificate Problems
Symptoms:
- “certificate verify failed” errors
- SSL handshake failures
- Connection refused with SSL
Solutions:
# Skip SSL verification (not recommended for production)
<match secure.**>
@type elasticsearch
scheme https
ssl_verify false
</match>
# Proper SSL configuration
<match secure.**>
@type elasticsearch
scheme https
ssl_verify true
ca_file /etc/ssl/certs/ca-certificates.crt
client_cert /etc/fluent/certs/client.crt
client_key /etc/fluent/certs/client.key
</match>
# Debug SSL issues
<match secure.**>
@type elasticsearch
scheme https
ssl_verify true
ssl_version TLSv1_2
log_es_400_reason true
</match>ANSI11.6 Troubleshooting Workflow
Step-by-Step Debugging Process
- Identify the Problem:
# Check Fluentd status
systemctl status fluentd
# Check logs
tail -f /var/log/fluent/fluentd.log
# Check configuration
fluentd -c /etc/fluent/fluent.conf --dry-runANSI2. Gather Information:
# System resources
top -p $(pgrep fluentd)
df -h /var/log/fluent
# Network connectivity
nc -zv elasticsearch.example.com 9200
ping elasticsearch.example.com
# Plugin status
curl http://localhost:24220/api/plugins.json | jq '.'ANSI- Isolate the Issue:
# Minimal test configuration
<source>
@type forward
port 24224
</source>
<match **>
@type stdout
</match>ANSI4. Enable Debug Logging:
<system>
log_level debug
suppress_repeated_stacktrace false
</system>ANSI- Test Components Individually:
# Test input
echo '{"test":"message"}' | fluent-cat test.debug
# Test parsing
echo 'test log line' > /tmp/test.log
# Configure tail input to read from /tmp/test.log
# Test output
# Use stdout output to verify data flowANSI11.7 Performance Monitoring and Alerting
Key Metrics Dashboard
# Prometheus metrics configuration
<source>
@type prometheus
bind 0.0.0.0
port 24231
metrics_path /metrics
</source>
<source>
@type prometheus_monitor
interval 10
<labels>
hostname ${hostname}
service fluentd
</labels>
</source>
<source>
@type prometheus_output_monitor
interval 10
<labels>
hostname ${hostname}
</labels>
</source>ANSICritical Alerts Configuration
# prometheus-alerts.yml
groups:
- name: fluentd.rules
rules:
- alert: FluentdDown
expr: up{job="fluentd"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Fluentd instance {{ $labels.instance }} is down"
description: "Fluentd has been down for more than 1 minute"
- alert: FluentdHighBufferUsage
expr: fluentd_status_buffer_queue_length > 1000
for: 5m
labels:
severity: warning
annotations:
summary: "High buffer usage on {{ $labels.instance }}"
description: "Buffer queue length is {{ $value }}"
- alert: FluentdHighErrorRate
expr: rate(fluentd_status_retry_count[5m]) > 10
for: 2m
labels:
severity: warning
annotations:
summary: "High error rate on {{ $labels.instance }}"
description: "Error rate is {{ $value }} errors/second"
- alert: FluentdNoDataFlow
expr: rate(fluentd_input_status_num_records_total[5m]) == 0
for: 10m
labels:
severity: warning
annotations:
summary: "No data flowing through {{ $labels.instance }}"
description: "No records processed in the last 10 minutes"YAMLHealth Check Script
#!/bin/bash
# comprehensive_health_check.sh
set -e
FLUENTD_HOST="localhost"
MONITOR_PORT="24220"
HEALTH_URL="http://${FLUENTD_HOST}:${MONITOR_PORT}/api/plugins.json"
LOG_FILE="/var/log/fluent/fluentd.log"
BUFFER_DIR="/var/log/fluent/buffer"
# Colors for output
RED='\033[0;31m'
YELLOW='\033[1;33m'
GREEN='\033[0;32m'
NC='\033[0m' # No Color
# Function to print colored output
print_status() {
local status=$1
local message=$2
case $status in
"OK")
echo -e "${GREEN}✓ OK${NC}: $message"
;;
"WARNING")
echo -e "${YELLOW}⚠ WARNING${NC}: $message"
;;
"CRITICAL")
echo -e "${RED}✗ CRITICAL${NC}: $message"
;;
esac
}
# Check if Fluentd process is running
check_process() {
if pgrep -f fluentd > /dev/null; then
print_status "OK" "Fluentd process is running"
return 0
else
print_status "CRITICAL" "Fluentd process not found"
return 2
fi
}
# Check HTTP API endpoint
check_api() {
if curl -f -s "$HEALTH_URL" > /dev/null; then
print_status "OK" "Monitor agent responding"
return 0
else
print_status "CRITICAL" "Monitor agent not responding"
return 2
fi
}
# Check buffer health
check_buffers() {
local buffer_data
buffer_data=$(curl -s "$HEALTH_URL" 2>/dev/null)
if [ $? -ne 0 ]; then
print_status "CRITICAL" "Cannot retrieve buffer information"
return 2
fi
local total_queue_length
total_queue_length=$(echo "$buffer_data" | jq '.plugins[] | select(.buffer_queue_length != null) | .buffer_queue_length' | awk '{sum+=$1} END {print sum+0}')
if [ "$total_queue_length" -gt 1000 ]; then
print_status "CRITICAL" "High buffer queue length: $total_queue_length"
return 2
elif [ "$total_queue_length" -gt 500 ]; then
print_status "WARNING" "Moderate buffer queue length: $total_queue_length"
return 1
else
print_status "OK" "Buffer queue length: $total_queue_length"
return 0
fi
}
# Check recent errors
check_errors() {
if [ ! -f "$LOG_FILE" ]; then
print_status "WARNING" "Log file not found: $LOG_FILE"
return 1
fi
local error_count
error_count=$(tail -n 1000 "$LOG_FILE" | grep -c "\[error\]" || echo 0)
if [ "$error_count" -gt 10 ]; then
print_status "CRITICAL" "High error count in recent logs: $error_count"
return 2
elif [ "$error_count" -gt 5 ]; then
print_status "WARNING" "Moderate error count in recent logs: $error_count"
return 1
else
print_status "OK" "Error count in recent logs: $error_count"
return 0
fi
}
# Check disk space
check_disk_space() {
if [ ! -d "$BUFFER_DIR" ]; then
print_status "WARNING" "Buffer directory not found: $BUFFER_DIR"
return 1
fi
local usage
usage=$(df "$BUFFER_DIR" | awk 'NR==2 {print $5}' | sed 's/%//')
if [ "$usage" -gt 90 ]; then
print_status "CRITICAL" "High disk usage: ${usage}%"
return 2
elif [ "$usage" -gt 80 ]; then
print_status "WARNING" "Moderate disk usage: ${usage}%"
return 1
else
print_status "OK" "Disk usage: ${usage}%"
return 0
fi
}
# Check memory usage
check_memory() {
local memory_usage
memory_usage=$(ps -p $(pgrep fluentd) -o %mem --no-headers | awk '{print int($1)}')
if [ "$memory_usage" -gt 80 ]; then
print_status "CRITICAL" "High memory usage: ${memory_usage}%"
return 2
elif [ "$memory_usage" -gt 60 ]; then
print_status "WARNING" "Moderate memory usage: ${memory_usage}%"
return 1
else
print_status "OK" "Memory usage: ${memory_usage}%"
return 0
fi
}
# Main execution
main() {
echo "=== Fluentd Health Check ==="
echo "Timestamp: $(date)"
echo "Host: $(hostname)"
echo ""
local exit_code=0
check_process || exit_code=$?
check_api || exit_code=$?
check_buffers || exit_code=$?
check_errors || exit_code=$?
check_disk_space || exit_code=$?
check_memory || exit_code=$?
echo ""
case $exit_code in
0)
echo -e "${GREEN}Overall Status: HEALTHY${NC}"
;;
1)
echo -e "${YELLOW}Overall Status: WARNING${NC}"
;;
2)
echo -e "${RED}Overall Status: CRITICAL${NC}"
;;
esac
exit $exit_code
}
main "$@"BashThis enhanced troubleshooting section provides comprehensive guidance for diagnosing and resolving common Fluentd issues, with practical examples and actionable solutions.
11.6 Alerting Setup
Prometheus Alerting Rules
# fluentd_alerts.yml
groups:
- name: fluentd
rules:
- alert: FluentdDown
expr: up{job="fluentd"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Fluentd instance is down"
- alert: FluentdHighBufferUsage
expr: fluentd_status_buffer_queue_length > 100
for: 5m
labels:
severity: warning
annotations:
summary: "Fluentd buffer queue is high"
- alert: FluentdParseErrors
expr: rate(fluentd_input_status_num_records_total[5m]) == 0
for: 10m
labels:
severity: warning
annotations:
summary: "Fluentd is not processing any records"YAMLCustom Health Check
<source>
@type exec
command /usr/local/bin/fluentd_health_check.sh
format json
tag fluentd.health
run_interval 60s
</source>
<match fluentd.health>
@type copy
# Send to monitoring system
<store>
@type elasticsearch
index_name fluentd-health
</store>
# Alert on failures
<store>
@type grep
<regexp>
key status
pattern ERROR
</regexp>
@type slack
webhook_url YOUR_WEBHOOK_URL
channel ops-alerts
</store>
</match>ANSI11.7 Performance Monitoring Dashboard
Grafana Dashboard Example
{
"dashboard": {
"title": "Fluentd Monitoring",
"panels": [
{
"title": "Events per Second",
"type": "graph",
"targets": [
{
"expr": "rate(fluentd_input_status_num_records_total[1m])",
"legendFormat": "{{tag}}"
}
]
},
{
"title": "Buffer Queue Length",
"type": "graph",
"targets": [
{
"expr": "fluentd_status_buffer_queue_length",
"legendFormat": "{{tag}}"
}
]
},
{
"title": "Memory Usage",
"type": "graph",
"targets": [
{
"expr": "process_resident_memory_bytes{job=\"fluentd\"}",
"legendFormat": "Memory Usage"
}
]
}
]
}
}ANSI12. Advanced Patterns
12.1 Multi-tenant Logging
Handle logs from multiple tenants with isolation and routing.
graph TD
A[Tenant A Logs] --> B[Fluentd Router]
C[Tenant B Logs] --> B
D[Tenant C Logs] --> B
B --> E{Tenant Routing}
E --> F[Tenant A ES Index]
E --> G[Tenant B ES Index]
E --> H[Tenant C ES Index]
B --> I[Shared Analytics]
B --> J[Audit Logs]
style B fill:#f9f,stroke:#333,stroke-width:2px# Multi-tenant configuration
<source>
@type forward
port 24224
bind 0.0.0.0
</source>
# Extract tenant from tag or record
<filter **>
@type record_transformer
enable_ruby true
<record>
tenant_id ${
if tag.start_with?('tenant.')
tag.split('.')[1]
elsif record.has_key?('tenant_id')
record['tenant_id']
else
'default'
end
}
</record>
</filter>
# Route to tenant-specific outputs
<match **>
@type rewrite_tag_filter
<rule>
key tenant_id
pattern ^(.+)$
tag routed.${tenant_id}.${tag}
</rule>
</match>
# Tenant A
<match routed.tenant-a.**>
@type elasticsearch
host es-tenant-a.example.com
index_name tenant-a-logs
<buffer>
@type file
path /var/log/fluent/buffer/tenant-a
</buffer>
</match>
# Tenant B
<match routed.tenant-b.**>
@type elasticsearch
host es-tenant-b.example.com
index_name tenant-b-logs
<buffer>
@type file
path /var/log/fluent/buffer/tenant-b
</buffer>
</match>
# Default tenant
<match routed.default.**>
@type file
path /var/log/fluent/default-tenant.log
</match>ANSI12.2 Event Enrichment Pipeline
Comprehensive data enrichment example.
# Raw log input
<source>
@type tail
path /var/log/nginx/access.log
tag nginx.access.raw
<parse>
@type nginx
</parse>
</source>
# Step 1: Basic enrichment
<filter nginx.access.raw>
@type record_transformer
<record>
hostname ${hostname}
environment production
service_name nginx
log_type access
ingestion_time ${Time.now.utc.iso8601}
</record>
</filter>
# Step 2: GeoIP enrichment
<filter nginx.access.raw>
@type geoip
geoip_lookup_keys remote
<record>
country_code ${city.country.iso_code}
country_name ${city.country.names.en}
city_name ${city.city.names.en}
latitude ${location.latitude}
longitude ${location.longitude}
timezone ${location.time_zone}
</record>
skip_adding_null_record false
</filter>
# Step 3: User agent parsing
<filter nginx.access.raw>
@type parser
key_name agent
reserve_data true
inject_key_prefix ua_
<parse>
@type user_agent
</parse>
</filter>
# Step 4: Request classification
<filter nginx.access.raw>
@type record_transformer
enable_ruby true
<record>
request_type ${
case record["path"]
when /^\/api\//
"api"
when /^\/static\//
"static"
when /^\/admin\//
"admin"
else
"web"
end
}
is_bot ${record["ua_name"] =~ /(bot|crawler|spider)/i ? true : false}
response_class ${
status = record["status"].to_i
case status
when 200..299
"success"
when 300..399
"redirect"
when 400..499
"client_error"
when 500..599
"server_error"
else
"unknown"
end
}
</record>
</filter>
# Step 5: Security analysis
<filter nginx.access.raw>
@type record_transformer
enable_ruby true
<record>
security_flags ${
flags = []
# Check for suspicious patterns
flags << "sql_injection" if record["path"] =~ /('|(union\s+select)|(drop\s+table))/i
flags << "xss_attempt" if record["path"] =~ /(<script|javascript:|onload=)/i
flags << "path_traversal" if record["path"] =~ /(\.\.\/|\.\.\\)/
flags << "large_request" if record["request_length"].to_i > 100000
flags.join(",")
}
risk_score ${
score = 0
score += 10 if record["status"].to_i >= 400
score += 20 if record["response_time"].to_f > 5.0
score += 30 if record["security_flags"].length > 0
score += 5 if record["is_bot"]
score
}
</record>
</filter>
# Route enriched data
<match nginx.access.raw>
@type rewrite_tag_filter
<rule>
key risk_score
pattern ^([3-9]\d|\d{3,})$
tag security.high_risk.nginx.access
</rule>
<rule>
key request_type
pattern api
tag analytics.api.nginx.access
</rule>
<rule>
key request_type
pattern .*
tag analytics.web.nginx.access
</rule>
</match>ANSI12.3 Dead Letter Queue Pattern
Handle failed events with retry and dead letter queues.
# Main processing pipeline
<match app.**>
@type copy
# Primary output
<store>
@type elasticsearch
host elasticsearch.example.com
index_name app-logs
<buffer>
@type file
path /var/log/fluent/buffer/elasticsearch
retry_type exponential_backoff
retry_max_times 3
retry_timeout 1h
</buffer>
# Send failures to DLQ
<secondary>
@type relabel
@label @DLQ
</secondary>
</store>
</match>
# Dead Letter Queue processing
<label @DLQ>
<filter **>
@type record_transformer
<record>
dlq_timestamp ${Time.now.utc.iso8601}
dlq_reason elasticsearch_failure
original_tag ${tag}
</record>
</filter>
# Store in DLQ
<match **>
@type file
path /var/log/fluent/dlq/failed_events.%Y%m%d.log
<format>
@type json
</format>
<buffer time>
@type file
path /var/log/fluent/buffer/dlq
timekey 3600
timekey_wait 300
</buffer>
</match>
</label>
# DLQ reprocessing job (separate process)
<source>
@type tail
path /var/log/fluent/dlq/failed_events.*.log
tag dlq.reprocess
<parse>
@type json
</parse>
</source>
<filter dlq.reprocess>
@type record_transformer
<record>
reprocess_attempt ${(record["reprocess_attempt"] || 0).to_i + 1}
reprocess_timestamp ${Time.now.utc.iso8601}
</record>
</filter>
# Retry with exponential backoff
<match dlq.reprocess>
@type elasticsearch
host elasticsearch.example.com
index_name app-logs-dlq
<buffer>
@type file
path /var/log/fluent/buffer/dlq-retry
retry_type exponential_backoff
retry_max_times 5
retry_timeout 24h
</buffer>
</match>ANSI12.4 Circuit Breaker Pattern
Implement circuit breaker for external service protection.
# Circuit breaker filter
<filter app.**>
@type record_transformer
enable_ruby true
<record>
circuit_breaker_state ${
# Simple circuit breaker logic
error_file = "/tmp/elasticsearch_errors"
current_time = Time.now.to_i
# Read error count and timestamp
if File.exist?(error_file)
lines = File.readlines(error_file)
last_error_time = lines.last.to_i if lines.any?
error_count = lines.size
else
last_error_time = 0
error_count = 0
end
# Circuit breaker states
if error_count >= 5 && (current_time - last_error_time) < 300
"open" # Circuit open, bypass for 5 minutes
elsif error_count >= 5 && (current_time - last_error_time) >= 300
"half_open" # Try again
else
"closed" # Normal operation
end
}
</record>
</filter>
# Route based on circuit breaker state
<match app.**>
@type rewrite_tag_filter
<rule>
key circuit_breaker_state
pattern open
tag circuit_breaker.bypass.${tag}
</rule>
<rule>
key circuit_breaker_state
pattern .*
tag circuit_breaker.normal.${tag}
</rule>
</match>
# Normal processing (circuit closed/half-open)
<match circuit_breaker.normal.**>
@type elasticsearch
host elasticsearch.example.com
<buffer>
retry_max_times 2
</buffer>
# On failure, record error and use fallback
<secondary>
@type exec
command echo $(date +%s) >> /tmp/elasticsearch_errors
<format>
@type none
</format>
</secondary>
</match>
# Bypass processing (circuit open)
<match circuit_breaker.bypass.**>
@type file
path /var/log/fluent/circuit_breaker_bypass.log
<format>
@type json
</format>
</match>ANSI12.5 Data Lake Pattern
Organize data for analytics and data science.
# Multi-format data lake storage
<match app.**>
@type copy
# Real-time analytics (Elasticsearch)
<store>
@type elasticsearch
host elasticsearch.example.com
index_name app-logs
</store>
# Data lake storage (S3 with partitioning)
<store>
@type s3
s3_bucket data-lake-raw
# Partitioned by date and service
path raw/service=${tag_parts[0]}/year=%Y/month=%m/day=%d/hour=%H/
s3_object_key_format %{path}%{time_slice}_%{index}.%{file_extension}
# Multiple formats for different use cases
<format>
@type json
</format>
<buffer time,tag>
@type file
path /var/log/fluent/buffer/datalake
timekey 3600
timekey_wait 300
chunk_limit_size 128MB
</buffer>
</store>
# Processed data for ML (Parquet format)
<store>
@type s3
s3_bucket data-lake-processed
path processed/service=${tag_parts[0]}/year=%Y/month=%m/day=%d/
# Use Parquet for efficient analytics
<format>
@type parquet
schema [
{name: "timestamp", type: "string"},
{name: "level", type: "string"},
{name: "message", type: "string"},
{name: "service", type: "string"},
{name: "hostname", type: "string"}
]
</format>
<buffer time,tag>
@type file
path /var/log/fluent/buffer/parquet
timekey 86400 # Daily files
timekey_wait 3600
chunk_limit_size 256MB
</buffer>
</store>
</match>ANSI13. Custom Plugin Development
13.1 Plugin Architecture
graph TD
A[Fluentd Plugin System] --> B[Input Plugins]
A --> C[Output Plugins]
A --> D[Filter Plugins]
A --> E[Parser Plugins]
A --> F[Formatter Plugins]
A --> G[Buffer Plugins]
B --> H[Base Input Class]
C --> I[Base Output Class]
D --> J[Base Filter Class]
E --> K[Base Parser Class]
F --> L[Base Formatter Class]
G --> M[Base Buffer Class]
style A fill:#e3f2fd
style H fill:#e8f5e8
style I fill:#fff3e0
style J fill:#f3e5f5
style K fill:#ffebee
style L fill:#e0f2f1
style M fill:#fce4ec13.2 Input Plugin Example
Create a custom input plugin that reads from a Redis list.
# filepath: lib/fluent/plugin/in_redis_list.rb
require 'fluent/plugin/input'
require 'redis'
module Fluent
module Plugin
class RedisListInput < Fluent::Plugin::Input
Fluent::Plugin.register_input('redis_list', self)
desc 'Redis host'
config_param :host, :string, default: 'localhost'
desc 'Redis port'
config_param :port, :integer, default: 6379
desc 'Redis password'
config_param :password, :string, default: nil, secret: true
desc 'Redis database number'
config_param :db, :integer, default: 0
desc 'Redis list key to read from'
config_param :key, :string
desc 'Tag to assign to events'
config_param :tag, :string
desc 'Polling interval in seconds'
config_param :interval, :time, default: 1
desc 'Batch size for each poll'
config_param :batch_size, :integer, default: 100
def configure(conf)
super
@redis_config = {
host: @host,
port: @port,
password: @password,
db: @db,
timeout: 5
}
end
def start
super
@redis = Redis.new(@redis_config)
@thread = Thread.new(&method(:run))
end
def shutdown
@thread.terminate if @thread
@redis.quit if @redis
super
end
private
def run
while true
begin
messages = @redis.lrange(@key, 0, @batch_size - 1)
if messages.any?
# Remove processed messages
@redis.ltrim(@key, messages.size, -1)
# Process each message
messages.each do |message|
begin
record = JSON.parse(message)
time = record.delete('timestamp') || Engine.now
router.emit(@tag, time, record)
rescue JSON::ParserError => e
log.warn "Failed to parse JSON: #{message}", error: e
# Emit as raw message
router.emit(@tag, Engine.now, {'raw_message' => message})
end
end
end
sleep @interval
rescue => e
log.error "Redis list input error", error: e
sleep @interval
end
end
end
end
end
endRubyUsage configuration:
<source>
@type redis_list
host redis.example.com
port 6379
password secret
key application_logs
tag app.redis_logs
interval 2s
batch_size 50
</source>ANSI13.3 Filter Plugin Example
Create a filter that adds machine learning predictions.
# filepath: lib/fluent/plugin/filter_ml_classifier.rb
require 'fluent/plugin/filter'
require 'net/http'
require 'json'
module Fluent
module Plugin
class MLClassifierFilter < Fluent::Plugin::Filter
Fluent::Plugin.register_filter('ml_classifier', self)
desc 'ML service endpoint URL'
config_param :endpoint_url, :string
desc 'API key for ML service'
config_param :api_key, :string, secret: true
desc 'Fields to send for classification'
config_param :input_fields, :array, default: ['message']
desc 'Field name for prediction result'
config_param :prediction_field, :string, default: 'ml_prediction'
desc 'Field name for confidence score'
config_param :confidence_field, :string, default: 'ml_confidence'
desc 'Timeout for ML service calls'
config_param :timeout, :time, default: 5
desc 'Cache TTL for predictions'
config_param :cache_ttl, :time, default: 300
def configure(conf)
super
@cache = {}
@uri = URI(@endpoint_url)
end
def filter(tag, time, record)
begin
# Create cache key from input fields
cache_key = @input_fields.map { |field| record[field] }.join('|')
# Check cache first
if cached_result = get_from_cache(cache_key)
record[@prediction_field] = cached_result[:prediction]
record[@confidence_field] = cached_result[:confidence]
return record
end
# Prepare ML service request
input_data = {}
@input_fields.each do |field|
input_data[field] = record[field] if record.key?(field)
end
# Call ML service
prediction_result = call_ml_service(input_data)
if prediction_result
record[@prediction_field] = prediction_result[:prediction]
record[@confidence_field] = prediction_result[:confidence]
# Cache the result
set_cache(cache_key, prediction_result)
end
rescue => e
log.warn "ML classification failed", error: e, record: record
record[@prediction_field] = 'error'
record[@confidence_field] = 0.0
end
record
end
private
def call_ml_service(input_data)
http = Net::HTTP.new(@uri.host, @uri.port)
http.use_ssl = (@uri.scheme == 'https')
http.read_timeout = @timeout
request = Net::HTTP::Post.new(@uri.path)
request['Content-Type'] = 'application/json'
request['Authorization'] = "Bearer #{@api_key}"
request.body = input_data.to_json
response = http.request(request)
if response.code == '200'
result = JSON.parse(response.body)
{
prediction: result['prediction'],
confidence: result['confidence']
}
else
log.warn "ML service returned error", code: response.code, body: response.body
nil
end
end
def get_from_cache(key)
if @cache.key?(key)
entry = @cache[key]
if Time.now - entry[:timestamp] < @cache_ttl
return entry[:result]
else
@cache.delete(key)
end
end
nil
end
def set_cache(key, result)
@cache[key] = {
result: result,
timestamp: Time.now
}
# Simple cache cleanup
if @cache.size > 1000
@cache = @cache.select { |k, v| Time.now - v[:timestamp] < @cache_ttl }
end
end
end
end
endRuby13.4 Output Plugin Example
Create an output plugin for a custom webhook service.
# filepath: lib/fluent/plugin/out_webhook.rb
require 'fluent/plugin/output'
require 'net/http'
require 'json'
require 'openssl'
module Fluent
module Plugin
class WebhookOutput < Fluent::Plugin::Output
Fluent::Plugin.register_output('webhook', self)
desc 'Webhook endpoint URL'
config_param :endpoint_url, :string
desc 'HTTP method'
config_param :http_method, :enum, list: [:post, :put, :patch], default: :post
desc 'Request headers'
config_param :headers, :hash, default: {}
desc 'Authentication type'
config_param :auth_type, :enum, list: [:none, :basic, :bearer, :signature], default: :none
desc 'Username for basic auth'
config_param :username, :string, default: nil, secret: true
desc 'Password for basic auth'
config_param :password, :string, default: nil, secret: true
desc 'Bearer token'
config_param :bearer_token, :string, default: nil, secret: true
desc 'Signature secret for HMAC'
config_param :signature_secret, :string, default: nil, secret: true
desc 'Signature header name'
config_param :signature_header, :string, default: 'X-Signature'
desc 'Request timeout'
config_param :timeout, :time, default: 10
desc 'SSL verification'
config_param :ssl_verify, :bool, default: true
def configure(conf)
super
@uri = URI(@endpoint_url)
# Validate configuration
if @auth_type == :basic && (@username.nil? || @password.nil?)
raise Fluent::ConfigError, 'username and password required for basic auth'
end
if @auth_type == :bearer && @bearer_token.nil?
raise Fluent::ConfigError, 'bearer_token required for bearer auth'
end
if @auth_type == :signature && @signature_secret.nil?
raise Fluent::ConfigError, 'signature_secret required for signature auth'
end
end
def multi_workers_ready?
true
end
def write(chunk)
events = []
chunk.msgpack_each do |time, record|
events << {
timestamp: Time.at(time).utc.iso8601,
record: record
}
end
payload = {
events: events,
metadata: {
count: events.size,
chunk_id: chunk.unique_id
}
}
send_webhook(payload)
end
private
def send_webhook(payload)
http = Net::HTTP.new(@uri.host, @uri.port)
http.use_ssl = (@uri.scheme == 'https')
http.verify_mode = @ssl_verify ? OpenSSL::SSL::VERIFY_PEER : OpenSSL::SSL::VERIFY_NONE
http.read_timeout = @timeout
http.open_timeout = @timeout
# Create request
request = case @http_method
when :post
Net::HTTP::Post.new(@uri.path)
when :put
Net::HTTP::Put.new(@uri.path)
when :patch
Net::HTTP::Patch.new(@uri.path)
end
# Set headers
request['Content-Type'] = 'application/json'
@headers.each { |key, value| request[key] = value }
# Set authentication
case @auth_type
when :basic
request.basic_auth(@username, @password)
when :bearer
request['Authorization'] = "Bearer #{@bearer_token}"
when :signature
body = payload.to_json
signature = OpenSSL::HMAC.hexdigest(OpenSSL::Digest.new('sha256'), @signature_secret, body)
request[@signature_header] = "sha256=#{signature}"
request.body = body
end
# Set body if not already set
request.body ||= payload.to_json
# Send request
response = http.request(request)
unless response.code.start_with?('2')
raise "Webhook request failed: #{response.code} #{response.body}"
end
log.debug "Webhook sent successfully", code: response.code, events: payload[:metadata][:count]
rescue => e
log.error "Failed to send webhook", error: e
raise
end
end
end
endRuby13.5 Plugin Testing
Create comprehensive tests for your plugins.
# filepath: test/plugin/test_in_redis_list.rb
require 'helper'
require 'fluent/test/driver/input'
require 'fluent/plugin/in_redis_list'
class RedisListInputTest < Test::Unit::TestCase
def setup
Fluent::Test.setup
end
CONFIG = %[
host localhost
port 6379
key test_list
tag test.redis
interval 1s
batch_size 10
]
def create_driver(conf = CONFIG)
Fluent::Test::Driver::Input.new(Fluent::Plugin::RedisListInput).configure(conf)
end
test 'configure' do
d = create_driver
assert_equal 'localhost', d.instance.host
assert_equal 6379, d.instance.port
assert_equal 'test_list', d.instance.key
assert_equal 'test.redis', d.instance.tag
end
test 'emit events from redis list' do
# Mock Redis
redis_mock = mock('redis')
redis_mock.expects(:lrange).returns(['{"message": "test1"}', '{"message": "test2"}'])
redis_mock.expects(:ltrim).with('test_list', 2, -1)
redis_mock.expects(:quit)
Redis.stubs(:new).returns(redis_mock)
d = create_driver
d.run(expect_emits: 2, timeout: 5) do
# Events will be emitted by the plugin
end
events = d.events
assert_equal 2, events.length
assert_equal 'test.redis', events[0][0]
assert_equal({'message' => 'test1'}, events[0][2])
assert_equal({'message' => 'test2'}, events[1][2])
end
endRuby13.6 Plugin Distribution
Gem Structure
fluent-plugin-myservice/
├── fluent-plugin-myservice.gemspec
├── lib/
│ └── fluent/
│ └── plugin/
│ ├── in_myservice.rb
│ ├── out_myservice.rb
│ └── filter_myservice.rb
├── test/
│ └── plugin/
│ ├── test_in_myservice.rb
│ ├── test_out_myservice.rb
│ └── test_filter_myservice.rb
├── README.md
└── LICENSEANSIGemspec Example
# filepath: fluent-plugin-myservice.gemspec
Gem::Specification.new do |spec|
spec.name = "fluent-plugin-myservice"
spec.version = "1.0.0"
spec.authors = ["Your Name"]
spec.email = ["your.email@example.com"]
spec.summary = "Fluentd plugin for MyService"
spec.description = "Input, output, and filter plugins for MyService integration"
spec.homepage = "https://github.com/yourname/fluent-plugin-myservice"
spec.license = "MIT"
spec.files = Dir['lib/**/*', 'README.md', 'LICENSE']
spec.require_paths = ["lib"]
spec.add_dependency "fluentd", ">= 1.0", "< 2"
spec.add_dependency "httparty", "~> 0.18"
spec.add_development_dependency "bundler", "~> 2.0"
spec.add_development_dependency "rake", "~> 13.0"
spec.add_development_dependency "test-unit", "~> 3.0"
spec.add_development_dependency "mocha", "~> 1.0"
endANSI14. Production Deployment
14.1 Deployment Architecture
graph TB
A[Application Servers] --> B[Fluentd Forwarders]
C[System Logs] --> B
D[Container Logs] --> E[Fluentd DaemonSet]
B --> F[Load Balancer]
E --> F
F --> G[Fluentd Aggregators]
G --> H[Elasticsearch Cluster]
G --> I[S3 Storage]
G --> J[Monitoring Systems]
K[Fluentd Monitoring] --> G
L[Config Management] --> G
style G fill:#f9f,stroke:#333,stroke-width:2px
style F fill:#e3f2fd14.2 High Availability Setup
Load Balancer Configuration
# nginx upstream configuration
upstream fluentd_aggregators {
least_conn;
server fluentd-agg-1:24224 weight=1 max_fails=3 fail_timeout=30s;
server fluentd-agg-2:24224 weight=1 max_fails=3 fail_timeout=30s;
server fluentd-agg-3:24224 weight=1 max_fails=3 fail_timeout=30s;
keepalive 32;
}
server {
listen 24224;
proxy_pass fluentd_aggregators;
proxy_timeout 60s;
proxy_connect_timeout 5s;
}ANSIAggregator Configuration
# fluentd-aggregator.conf
<system>
workers 4
root_dir /var/log/fluent
log_level info
</system>
# Input from forwarders
<source>
@type forward
port 24224
bind 0.0.0.0
# Security
<security>
self_hostname #{ENV['HOSTNAME']}
shared_key "#{ENV['FLUENT_SHARED_KEY']}"
</security>
# Performance
chunk_size_limit 8MB
chunk_size_warn_limit 8MB
</source>
# Health check endpoint
<source>
@type http
port 8888
bind 0.0.0.0
</source>
# Metrics
<source>
@type monitor_agent
port 24220
bind 0.0.0.0
</source>
<source>
@type prometheus
port 24231
bind 0.0.0.0
</source>
# Processing pipeline
@include conf.d/*.conf
# Fallback output
<match **>
@type file
path /var/log/fluent/fallback/events.%Y%m%d.log
<format>
@type json
</format>
<buffer time>
@type file
path /var/log/fluent/buffer/fallback
timekey 3600
timekey_wait 300
</buffer>
</match>ANSIForwarder Configuration
# fluentd-forwarder.conf
<system>
log_level info
suppress_repeated_stacktrace true
</system>
# Local log collection
<source>
@type tail
path /var/log/app/*.log
pos_file /var/log/fluent/app.log.pos
tag app.logs
<parse>
@type json
time_key timestamp
time_format %Y-%m-%d %H:%M:%S %z
</parse>
</source>
<source>
@type tail
path /var/log/nginx/*.log
pos_file /var/log/fluent/nginx.log.pos
tag nginx.logs
<parse>
@type nginx
</parse>
</source>
# Forward to aggregators
<match **>
@type forward
# Multiple aggregators for HA
<server>
name agg1
host fluentd-agg-1.example.com
port 24224
weight 60
</server>
<server>
name agg2
host fluentd-agg-2.example.com
port 24224
weight 60
standby
</server>
<server>
name agg3
host fluentd-agg-3.example.com
port 24224
weight 60
standby
</server>
# Security
<security>
self_hostname #{ENV['HOSTNAME']}
shared_key "#{ENV['FLUENT_SHARED_KEY']}"
</security>
# Buffering
<buffer>
@type file
path /var/log/fluent/buffer/forward
chunk_limit_size 16MB
queue_limit_length 128
flush_mode interval
flush_interval 10s
retry_type exponential_backoff
retry_forever true
</buffer>
# Fallback to local storage
<secondary>
@type file
path /var/log/fluent/failed/forward.%Y%m%d.log
<format>
@type json
</format>
</secondary>
</match>ANSI14.3 Kubernetes Deployment
DaemonSet for Log Collection
# fluentd-daemonset.yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: fluentd
namespace: logging
labels:
k8s-app: fluentd-logging
spec:
selector:
matchLabels:
name: fluentd
template:
metadata:
labels:
name: fluentd
spec:
serviceAccountName: fluentd
tolerations:
- key: node-role.kubernetes.io/master
effect: NoSchedule
containers:
- name: fluentd
image: fluent/fluentd-kubernetes-daemonset:v1-debian-elasticsearch
env:
- name: FLUENT_ELASTICSEARCH_HOST
value: "elasticsearch.logging.svc.cluster.local"
- name: FLUENT_ELASTICSEARCH_PORT
value: "9200"
- name: FLUENT_ELASTICSEARCH_SCHEME
value: "http"
- name: FLUENTD_SYSTEMD_CONF
value: disable
resources:
limits:
memory: 200Mi
cpu: 100m
requests:
memory: 200Mi
cpu: 100m
volumeMounts:
- name: varlog
mountPath: /var/log
- name: varlibdockercontainers
mountPath: /var/lib/docker/containers
readOnly: true
- name: fluentd-config
mountPath: /fluentd/etc/conf.d
terminationGracePeriodSeconds: 30
volumes:
- name: varlog
hostPath:
path: /var/log
- name: varlibdockercontainers
hostPath:
path: /var/lib/docker/containers
- name: fluentd-config
configMap:
name: fluentd-configYAMLAggregator Deployment
# fluentd-aggregator.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: fluentd-aggregator
namespace: logging
spec:
replicas: 3
selector:
matchLabels:
app: fluentd-aggregator
template:
metadata:
labels:
app: fluentd-aggregator
spec:
containers:
- name: fluentd
image: fluent/fluentd:v1.15-1
ports:
- containerPort: 24224
name: forward
- containerPort: 24220
name: monitor
- containerPort: 24231
name: metrics
env:
- name: FLUENTD_CONF
value: fluent.conf
- name: FLUENT_ELASTICSEARCH_HOST
value: elasticsearch.logging.svc.cluster.local
resources:
limits:
memory: 1Gi
cpu: 500m
requests:
memory: 1Gi
cpu: 500m
volumeMounts:
- name: fluentd-config
mountPath: /fluentd/etc
- name: buffer-storage
mountPath: /var/log/fluent
livenessProbe:
httpGet:
path: /api/plugins.json
port: 24220
initialDelaySeconds: 30
periodSeconds: 30
readinessProbe:
httpGet:
path: /api/plugins.json
port: 24220
initialDelaySeconds: 10
periodSeconds: 10
volumes:
- name: fluentd-config
configMap:
name: fluentd-aggregator-config
- name: buffer-storage
emptyDir:
sizeLimit: 10Gi
---
apiVersion: v1
kind: Service
metadata:
name: fluentd-aggregator
namespace: logging
spec:
selector:
app: fluentd-aggregator
ports:
- name: forward
port: 24224
targetPort: 24224
- name: monitor
port: 24220
targetPort: 24220
- name: metrics
port: 24231
targetPort: 24231
type: ClusterIPYAML14.4 Configuration Management
Ansible Playbook
# deploy-fluentd.yml
---
- hosts: fluentd_aggregators
become: yes
vars:
fluentd_version: "1.15.3"
elasticsearch_hosts:
- "es1.example.com:9200"
- "es2.example.com:9200"
- "es3.example.com:9200"
tasks:
- name: Install Fluentd
package:
name: fluent-package
state: present
- name: Create fluentd directories
file:
path: "{{ item }}"
state: directory
owner: fluent
group: fluent
mode: '0755'
loop:
- /etc/fluent/conf.d
- /var/log/fluent/buffer
- /var/log/fluent/failed
- name: Deploy main configuration
template:
src: fluent.conf.j2
dest: /etc/fluent/fluent.conf
owner: fluent
group: fluent
mode: '0644'
notify: restart fluentd
- name: Deploy pipeline configurations
template:
src: "{{ item }}.j2"
dest: "/etc/fluent/conf.d/{{ item }}"
owner: fluent
group: fluent
mode: '0644'
loop:
- 01-sources.conf
- 02-filters.conf
- 03-outputs.conf
notify: restart fluentd
- name: Configure systemd service
systemd:
name: fluentd
enabled: yes
state: started
daemon_reload: yes
handlers:
- name: restart fluentd
systemd:
name: fluentd
state: restartedYAMLDocker Compose
# docker-compose.yml
version: '3.8'
services:
fluentd-aggregator-1:
image: fluent/fluentd:v1.15-1
container_name: fluentd-agg-1
ports:
- "24224:24224"
- "24220:24220"
- "24231:24231"
volumes:
- ./config:/fluentd/etc
- fluentd-buffer-1:/var/log/fluent
environment:
- FLUENTD_CONF=fluent.conf
- FLUENT_ELASTICSEARCH_HOST=elasticsearch
depends_on:
- elasticsearch
restart: unless-stopped
networks:
- logging
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:24220/api/plugins.json"]
interval: 30s
timeout: 10s
retries: 3
fluentd-aggregator-2:
image: fluent/fluentd:v1.15-1
container_name: fluentd-agg-2
ports:
- "24225:24224"
- "24221:24220"
- "24232:24231"
volumes:
- ./config:/fluentd/etc
- fluentd-buffer-2:/var/log/fluent
environment:
- FLUENTD_CONF=fluent.conf
- FLUENT_ELASTICSEARCH_HOST=elasticsearch
depends_on:
- elasticsearch
restart: unless-stopped
networks:
- logging
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.15.0
container_name: elasticsearch
environment:
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ports:
- "9200:9200"
volumes:
- elasticsearch-data:/usr/share/elasticsearch/data
networks:
- logging
nginx-lb:
image: nginx:alpine
container_name: fluentd-lb
ports:
- "24224:24224"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
depends_on:
- fluentd-aggregator-1
- fluentd-aggregator-2
networks:
- logging
volumes:
fluentd-buffer-1:
fluentd-buffer-2:
elasticsearch-data:
networks:
logging:
driver: bridgeYAML14.5 Security Hardening
SSL/TLS Configuration
# Secure forward input
<source>
@type forward
port 24224
bind 0.0.0.0
<transport tls>
cert_path /etc/fluent/certs/server.crt
private_key_path /etc/fluent/certs/server.key
ca_cert_path /etc/fluent/certs/ca.crt
client_cert_auth true
</transport>
<security>
self_hostname fluentd-aggregator
shared_key "#{ENV['FLUENT_SHARED_KEY']}"
<client>
host fluentd-forwarder-1
shared_key "#{ENV['FLUENT_SHARED_KEY']}"
</client>
</security>
</source>
# Secure forward output
<match **>
@type forward
<server>
host secure-aggregator.example.com
port 24224
</server>
<transport tls>
cert_path /etc/fluent/certs/client.crt
private_key_path /etc/fluent/certs/client.key
ca_cert_path /etc/fluent/certs/ca.crt
</transport>
<security>
self_hostname fluentd-forwarder
shared_key "#{ENV['FLUENT_SHARED_KEY']}"
</security>
</match>ANSIRole-based Access Control
# Create fluentd user with minimal privileges
useradd -r -s /bin/false -d /var/lib/fluentd fluentd
# Set file permissions
chown -R fluentd:fluentd /etc/fluent
chown -R fluentd:fluentd /var/log/fluent
chmod 750 /etc/fluent
chmod 640 /etc/fluent/*.conf
chmod 600 /etc/fluent/certs/*
# SELinux context (if enabled)
semanage fcontext -a -t admin_home_exec_t "/usr/sbin/fluentd"
restorecon -R /usr/sbin/fluentdBash15. Best Practices
15.1 Configuration Best Practices
1. Organize Configuration Files
# Main configuration file
# /etc/fluent/fluent.conf
<system>
@include system.conf
</system>
@include conf.d/*.conf
# Global error handling
<label @ERROR>
<match **>
@type file
path /var/log/fluent/error/error.%Y%m%d.log
<format>
@type json
</format>
</match>
</label>ANSI# /etc/fluent/conf.d/01-inputs.conf
<source>
@type forward
port 24224
bind 0.0.0.0
</source>
<source>
@type tail
path /var/log/app/*.log
pos_file /var/log/fluent/app.log.pos
tag app.logs
<parse>
@type json
</parse>
</source>ANSI# /etc/fluent/conf.d/02-filters.conf
<filter app.**>
@type record_transformer
<record>
hostname ${hostname}
environment production
</record>
</filter>
<filter app.**>
@type grep
<exclude>
key path
pattern ^/health$
</exclude>
</filter>ANSI# /etc/fluent/conf.d/03-outputs.conf
<match app.**>
@type elasticsearch
host elasticsearch.example.com
index_name app-logs
<buffer>
@type file
path /var/log/fluent/buffer/elasticsearch
flush_mode interval
flush_interval 10s
</buffer>
</match>ANSI2. Use Environment Variables
# Environment-specific configuration
<match app.**>
@type elasticsearch
host "#{ENV['ELASTICSEARCH_HOST'] || 'localhost'}"
port "#{ENV['ELASTICSEARCH_PORT'] || 9200}"
user "#{ENV['ELASTICSEARCH_USER']}"
password "#{ENV['ELASTICSEARCH_PASSWORD']}"
index_name "#{ENV['APP_NAME'] || 'app'}-logs"
<buffer>
@type file
path /var/log/fluent/buffer/elasticsearch
flush_mode interval
flush_interval "#{ENV['FLUSH_INTERVAL'] || '10s'}"
</buffer>
</match>ANSI3. Implement Proper Error Handling
# Comprehensive error handling strategy
<system>
# Global error routing
emit_error_log_interval 30s
suppress_repeated_stacktrace true
</system>
# Main processing with error recovery
<match app.**>
@type copy
# Primary output
<store>
@type elasticsearch
host elasticsearch.example.com
index_name app-logs
<buffer>
retry_max_times 3
retry_type exponential_backoff
</buffer>
# Route failures to error handling
<secondary>
@type relabel
@label @ERROR_RECOVERY
</secondary>
</store>
</match>
# Error recovery pipeline
<label @ERROR_RECOVERY>
<filter **>
@type record_transformer
<record>
error_timestamp ${Time.now.utc.iso8601}
error_reason primary_output_failed
</record>
</filter>
# Try alternative output
<match **>
@type s3
s3_bucket backup-logs
path error-recovery/
<buffer>
flush_mode immediate
</buffer>
# Final fallback to local storage
<secondary>
@type file
path /var/log/fluent/ultimate_fallback.log
</secondary>
</match>
</label>ANSI15.2 Performance Best Practices
1. Optimize Input Performance
# High-performance tail configuration
<source>
@type tail
path /var/log/app/*.log
pos_file /var/log/fluent/app.log.pos
tag app.logs
# Performance optimizations
read_from_head false
refresh_interval 60
rotate_wait 5
enable_watch_timer false
enable_stat_watcher true
open_on_every_update false
read_lines_limit 1000
# Skip unnecessary parsing at input
<parse>
@type none
</parse>
</source>
# Parse only when needed
<filter app.logs>
@type parser
key_name message
reserve_data true
<parse>
@type json
json_parser oj # Fastest JSON parser
</parse>
</filter>ANSI2. Buffer Optimization Patterns
# Pattern 1: High-throughput, low-latency
<match realtime.**>
@type elasticsearch
<buffer>
@type memory
chunk_limit_size 8MB
flush_mode immediate
flush_thread_count 4
queue_limit_length 32
</buffer>
</match>
# Pattern 2: High-volume, batch processing
<match batch.**>
@type s3
<buffer time>
@type file
path /var/log/fluent/buffer/s3
timekey 3600
timekey_wait 300
chunk_limit_size 256MB
flush_thread_count 2
</buffer>
</match>
# Pattern 3: Critical data, maximum reliability
<match critical.**>
@type elasticsearch
<buffer>
@type file
path /var/log/fluent/buffer/critical
chunk_limit_size 32MB
queue_limit_length 128
flush_mode interval
flush_interval 30s
flush_at_shutdown true
retry_forever true
overflow_action block
</buffer>
</match>ANSI15.3 Security Best Practices
1. Data Sanitization
# Remove sensitive data
<filter app.**>
@type record_transformer
enable_ruby true
<record>
# Mask credit card numbers
message ${record["message"].gsub(/\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b/, "****-****-****-****")}
# Remove email addresses
sanitized_message ${record["message"].gsub(/\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b/, "[EMAIL_REDACTED]")}
</record>
# Remove sensitive fields entirely
remove_keys password,secret,token,api_key,authorization
</filter>
# PII detection and handling
<filter **>
@type grep
<regexp>
key message
pattern /\b\d{3}-\d{2}-\d{4}\b/ # SSN pattern
</regexp>
@label @PII_DETECTED
</filter>
<label @PII_DETECTED>
<match **>
@type file
path /var/log/fluent/pii_detected.log
<format>
@type json
</format>
</match>
</label>ANSI2. Access Control and Encryption
# Encrypted communication
<source>
@type forward
port 24224
<transport tls>
cert_path /etc/fluent/certs/server.crt
private_key_path /etc/fluent/certs/server.key
ca_cert_path /etc/fluent/certs/ca.crt
client_cert_auth true
ciphers TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256
insecure false
</transport>
<security>
self_hostname fluentd-server
shared_key "#{ENV['FLUENT_SHARED_KEY']}"
allow_anonymous_source false
<client>
host allowed-client.example.com
shared_key "#{ENV['FLUENT_SHARED_KEY']}"
</client>
</security>
</source>ANSI15.4 Monitoring and Alerting Best Practices
1. Comprehensive Metrics Collection
# System metrics
<system>
enable_input_metrics true
enable_size_metrics true
<metrics>
@type prometheus
port 24231
bind 0.0.0.0
</metrics>
</system>
# Custom application metrics
<filter app.**>
@type prometheus
<metric>
name fluentd_app_log_total
type counter
desc Total number of application logs
<labels>
service_name ${record["service"]}
log_level ${record["level"]}
hostname ${hostname}
</labels>
</metric>
<metric>
name fluentd_app_response_time
type histogram
desc Application response time distribution
key response_time
buckets 0.1,0.5,1.0,5.0,10.0
<labels>
service_name ${record["service"]}
endpoint ${record["endpoint"]}
</labels>
</metric>
</filter>ANSI2. Health Check Implementation
#!/bin/bash
# fluentd_health_check.sh
set -e
HEALTH_CHECK_URL="http://localhost:24220/api/plugins.json"
BUFFER_THRESHOLD=1000
ERROR_LOG="/var/log/fluent/fluentd.log"
# Check if Fluentd process is running
if ! pgrep -f fluentd > /dev/null; then
echo "CRITICAL: Fluentd process not running"
exit 2
fi
# Check HTTP endpoint
if ! curl -f -s "$HEALTH_CHECK_URL" > /dev/null; then
echo "CRITICAL: Monitor agent not responding"
exit 2
fi
# Check buffer queue lengths
BUFFER_SIZE=$(curl -s "$HEALTH_CHECK_URL" | \
jq '.plugins[] | select(.type=="output") | .buffer_queue_length // 0' | \
awk '{sum+=$1} END {print sum+0}')
if [ "$BUFFER_SIZE" -gt "$BUFFER_THRESHOLD" ]; then
echo "WARNING: High buffer queue length: $BUFFER_SIZE"
exit 1
fi
# Check for recent errors
ERROR_COUNT=$(grep -c "ERROR" "$ERROR_LOG" | tail -n 100 || echo 0)
if [ "$ERROR_COUNT" -gt 5 ]; then
echo "WARNING: High error count in recent logs: $ERROR_COUNT"
exit 1
fi
# Check disk space for buffer directory
BUFFER_USAGE=$(df /var/log/fluent | awk 'NR==2 {print $5}' | sed 's/%//')
if [ "$BUFFER_USAGE" -gt 80 ]; then
echo "WARNING: Buffer disk usage high: ${BUFFER_USAGE}%"
exit 1
fi
echo "OK: Fluentd is healthy"
exit 0Bash15.5 Troubleshooting Best Practices
1. Debug Configuration Template
# Debug configuration for troubleshooting
<system>
log_level debug
suppress_repeated_stacktrace false
emit_error_log_interval 10s
</system>
# Add debug outputs
<match **>
@type copy
# Your normal output
<store>
@type elasticsearch
index_name app-logs
</store>
# Debug output (remove in production)
<store>
@type stdout
output_type json
<format>
@type json
include_time_key true
time_key debug_timestamp
</format>
</store>
# Count events for debugging
<store>
@type prometheus
<metric>
name fluentd_debug_events_total
type counter
desc Debug event counter
<labels>
tag ${tag}
hostname ${hostname}
</labels>
</metric>
</store>
</match>ANSI2. Performance Profiling
# Add timing information
<filter **>
@type record_transformer
enable_ruby true
<record>
processing_start_time ${Time.now.to_f}
</record>
</filter>
# Measure processing time at output
<filter **>
@type record_transformer
enable_ruby true
<record>
processing_duration ${
start_time = record["processing_start_time"].to_f
Time.now.to_f - start_time
}
</record>
remove_keys processing_start_time
</filter>
# Alert on slow processing
<filter **>
@type grep
<regexp>
key processing_duration
pattern ^[1-9]\d*\. # More than 1 second
</regexp>
@label @SLOW_PROCESSING
</filter>
<label @SLOW_PROCESSING>
<match **>
@type slack
webhook_url "#{ENV['SLACK_WEBHOOK']}"
channel alerts
message "Slow log processing detected: %s seconds"
message_keys processing_duration
</match>
</label>ANSI15.6 Capacity Planning
1. Resource Estimation
graph TD
A[Log Volume Assessment] --> B[Daily Log Volume GB]
B --> C[Peak Hours Multiplier]
C --> D[Buffer Size Calculation]
E[Processing Requirements] --> F[Parse Complexity]
F --> G[Filter Operations]
G --> H[CPU Requirements]
I[Storage Requirements] --> J[Buffer Storage]
J --> K[Failed Event Storage]
K --> L[Disk Space Planning]
M[Network Bandwidth] --> N[Input Bandwidth]
N --> O[Output Bandwidth]
O --> P[Network Planning]
style A fill:#e3f2fd
style E fill:#e8f5e8
style I fill:#fff3e0
style M fill:#f3e5f52. Scaling Guidelines
# Small deployment (< 1GB/day)
<system>
workers 1
</system>
<match **>
@type elasticsearch
<buffer>
@type memory
chunk_limit_size 8MB
queue_limit_length 32
flush_thread_count 1
</buffer>
</match>
# Medium deployment (1-10GB/day)
<system>
workers 2
</system>
<match **>
@type elasticsearch
<buffer>
@type file
path /var/log/fluent/buffer
chunk_limit_size 32MB
queue_limit_length 128
flush_thread_count 4
</buffer>
</match>
# Large deployment (>10GB/day)
<system>
workers 4
</system>
<match **>
@type elasticsearch
<buffer>
@type file
path /var/log/fluent/buffer
chunk_limit_size 64MB
queue_limit_length 256
flush_thread_count 8
</buffer>
</match>ANSI15.7 Migration Best Practices
1. Gradual Migration Strategy
# Phase 1: Parallel logging (old + new)
<match app.**>
@type copy
# Existing logging system
<store>
@type file
path /var/log/app/existing.log
</store>
# New Fluentd pipeline
<store>
@type elasticsearch
index_name app-logs-new
</store>
</match>
# Phase 2: Add validation
<filter app.**>
@type record_transformer
enable_ruby true
<record>
migration_phase "parallel_validation"
fluentd_processed true
</record>
</filter>
# Phase 3: Gradual traffic shifting
<match app.**>
@type copy
# 90% to new system
<store>
@type elasticsearch
index_name app-logs-new
</store>
# 10% to old system for comparison
<store>
@type sample
sample_rate 10
@type file
path /var/log/app/validation.log
</store>
</match>ANSI15.8 Backup and Recovery
1. Configuration Backup
#!/bin/bash
# backup_fluentd_config.sh
BACKUP_DIR="/backup/fluentd"
CONFIG_DIR="/etc/fluent"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
# Create backup directory
mkdir -p "$BACKUP_DIR"
# Backup configuration
tar -czf "$BACKUP_DIR/fluentd_config_$TIMESTAMP.tar.gz" \
-C "$CONFIG_DIR" .
# Backup buffer state
tar -czf "$BACKUP_DIR/fluentd_buffer_$TIMESTAMP.tar.gz" \
-C "/var/log/fluent" buffer/
# Keep only last 30 days of backups
find "$BACKUP_DIR" -name "fluentd_*.tar.gz" -mtime +30 -delete
echo "Backup completed: $TIMESTAMP"ANSI2. Disaster Recovery Plan
# Emergency fallback configuration
<system>
log_level warn
suppress_repeated_stacktrace true
</system>
# Minimal input
<source>
@type forward
port 24224
bind 0.0.0.0
</source>
# Local storage fallback
<match **>
@type file
path /var/log/fluent/emergency/events.%Y%m%d_%H.log
<format>
@type json
</format>
<buffer time>
@type file
path /var/log/fluent/buffer/emergency
timekey 3600
timekey_wait 60
flush_at_shutdown true
</buffer>
</match>ANSI16. Best Practices and Patterns
16.1 Configuration Organization
Modular Configuration Structure
/etc/fluent/
├── fluent.conf # Main entry point
├── system.conf # System-wide settings
├── sources/ # Input configurations
│ ├── application.conf
│ ├── system-logs.conf
│ └── containers.conf
├── filters/ # Processing configurations
│ ├── enrichment.conf
│ ├── parsing.conf
│ └── security.conf
├── outputs/ # Destination configurations
│ ├── elasticsearch.conf
│ ├── s3-archive.conf
│ └── monitoring.conf
└── templates/ # Reusable configurations
├── common-filters.conf
└── error-handling.confANSIMain Configuration Pattern
# /etc/fluent/fluent.conf
<system>
@include system.conf
</system>
# Load input configurations
@include sources/*.conf
# Load processing configurations
@include filters/*.conf
# Load output configurations
@include outputs/*.conf
# Global error handling
<label @ERROR>
<match **>
@type file
path /var/log/fluent/errors/global.%Y%m%d.log
<format>
@type json
</format>
</match>
</label>ANSI16.2 Performance Patterns
High-Throughput Pattern
# For processing >10GB/day
<system>
workers 4
root_dir /var/log/fluent
</system>
<source>
@type tail
path /var/log/app/*.log
pos_file /var/log/fluent/app.log.pos
tag app.logs
# Performance optimizations
read_from_head false
refresh_interval 60
rotate_wait 5
enable_watch_timer false
enable_stat_watcher true
open_on_every_update false
read_lines_limit 1000
<parse>
@type none # Parse later if needed
</parse>
</source>
<match app.**>
@type elasticsearch
<buffer>
@type file
path /var/log/fluent/buffer/high-throughput
chunk_limit_size 64MB
queue_limit_length 512
flush_mode immediate
flush_thread_count 8
overflow_action drop_oldest_chunk
</buffer>
</match>ANSILow-Latency Pattern
# For real-time processing with <1s latency
<source>
@type forward
port 24224
chunk_size_limit 1MB
</source>
<match realtime.**>
@type elasticsearch
<buffer>
@type memory
chunk_limit_size 1MB
queue_limit_length 8
flush_mode immediate
retry_type exponential_backoff
retry_wait 0.1s
retry_max_interval 5s
</buffer>
</match>ANSIReliability Pattern
# For critical data that cannot be lost
<match critical.**>
@type copy
# Primary output
<store>
@type elasticsearch
host primary-es.example.com
<buffer>
@type file
path /var/log/fluent/buffer/critical-primary
chunk_limit_size 32MB
queue_limit_length 128
flush_mode interval
flush_interval 30s
flush_at_shutdown true
retry_forever true
overflow_action block
</buffer>
<secondary>
@type relabel
@label @BACKUP
</secondary>
</store>
# Synchronous backup
<store>
@type s3
s3_bucket critical-backup
path critical/
<buffer>
@type file
path /var/log/fluent/buffer/critical-backup
flush_mode immediate
</buffer>
</store>
</match>
<label @BACKUP>
<match **>
@type s3
s3_bucket emergency-backup
path emergency/
</match>
</label>ANSI16.3 Security Patterns
Data Sanitization Pattern
<filter sensitive.**>
@type record_transformer
enable_ruby true
<record>
# Remove PII
message ${
msg = record["message"].to_s
# Credit card numbers
msg = msg.gsub(/\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b/, "****-****-****-****")
# SSN
msg = msg.gsub(/\b\d{3}-\d{2}-\d{4}\b/, "***-**-****")
# Email addresses
msg = msg.gsub(/\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b/, "[EMAIL_REDACTED]")
# Phone numbers
msg = msg.gsub(/\b\d{3}-\d{3}-\d{4}\b/, "***-***-****")
msg
}
</record>
# Remove sensitive fields
remove_keys password,secret,token,api_key,authorization,session_id
</filter>ANSIEncryption in Transit Pattern
<source>
@type forward
port 24224
<transport tls>
cert_path /etc/fluent/certs/server.crt
private_key_path /etc/fluent/certs/server.key
ca_cert_path /etc/fluent/certs/ca.crt
client_cert_auth true
ciphers TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256
insecure false
</transport>
<security>
self_hostname fluentd-server
shared_key "#{ENV['FLUENT_SHARED_KEY']}"
allow_anonymous_source false
</security>
</source>ANSI16.4 Monitoring Patterns
Comprehensive Monitoring Pattern
# Enable all metrics
<system>
enable_input_metrics true
enable_size_metrics true
<metrics>
@type prometheus
port 24231
bind 0.0.0.0
</metrics>
</system>
# Application-specific metrics
<filter app.**>
@type prometheus
<metric>
name app_logs_total
type counter
desc Total application logs
<labels>
service ${record["service"]}
level ${record["level"]}
environment ${record["env"]}
</labels>
</metric>
<metric>
name app_response_time_seconds
type histogram
desc Application response time
key response_time
buckets 0.1,0.5,1.0,5.0,10.0
<labels>
service ${record["service"]}
endpoint ${record["endpoint"]}
</labels>
</metric>
</filter>
# Health check endpoint
<source>
@type http
port 8888
bind 0.0.0.0
cors_allow_origins ["*"]
</source>
<match health.check>
@type exec
command /usr/local/bin/health_check.sh
<format>
@type json
</format>
</match>ANSI17. Troubleshooting Guide
17.1 Quick Diagnosis Commands
# System health
systemctl status fluentd
ps aux | grep fluentd
netstat -tlnp | grep 24224
# Configuration validation
fluentd -c /etc/fluent/fluent.conf --dry-run
fluentd --show-plugin-config=input:tail
# Real-time monitoring
curl http://localhost:24220/api/plugins.json | jq '.'
tail -f /var/log/fluent/fluentd.log
# Performance analysis
top -p $(pgrep fluentd)
iotop -p $(pgrep fluentd)
lsof -p $(pgrep fluentd)Bash17.2 Common Error Messages
| Error Message | Cause | Solution |
|---|---|---|
failed to configure | Syntax error in config | Check configuration syntax with --dry-run |
buffer overflow | Output slower than input | Increase buffer size or improve output performance |
connection refused | Network connectivity issue | Check network and firewall settings |
certificate verify failed | SSL/TLS configuration | Verify certificates and CA chain |
permission denied | File system permissions | Check file ownership and permissions |
no such file or directory | Missing log files | Verify file paths and permissions |
17.3 Performance Troubleshooting
Memory Issues
# Check memory usage patterns
ps -p $(pgrep fluentd) -o pid,ppid,cmd,%mem,%cpu --sort=-%mem
# Monitor memory over time
while true; do
ps -p $(pgrep fluentd) -o %mem --no-headers
sleep 5
done
# Check for memory leaks
valgrind --tool=massif fluentd -c fluent.confBashCPU Issues
# Check CPU usage
top -p $(pgrep fluentd)
# Profile CPU usage
perf record -p $(pgrep fluentd) sleep 30
perf reportBashI/O Issues
# Monitor disk I/O
iotop -p $(pgrep fluentd)
# Check file descriptors
lsof -p $(pgrep fluentd) | wc -l
cat /proc/$(pgrep fluentd)/limits | grep filesBash18. Plugin Reference
18.1 Essential Input Plugins
| Plugin | Use Case | Configuration Example |
|---|---|---|
tail | File monitoring | @type tail; path /var/log/*.log |
forward | Receiving from other Fluentd | @type forward; port 24224 |
http | HTTP endpoint | @type http; port 9880 |
syslog | Syslog messages | @type syslog; port 5140 |
exec | Command output | @type exec; command "ps aux" |
dummy | Testing/development | @type dummy; dummy {"hello":"world"} |
18.2 Essential Output Plugins
| Plugin | Use Case | Configuration Example |
|---|---|---|
elasticsearch | Search and analytics | @type elasticsearch; host es.example.com |
s3 | Object storage | @type s3; s3_bucket my-logs |
kafka | Message streaming | @type kafka2; brokers kafka:9092 |
file | Local file storage | @type file; path /var/log/output.log |
forward | Sending to other Fluentd | @type forward; host fluentd.example.com |
copy | Multiple destinations | @type copy; <store>...</store> |
18.3 Essential Filter Plugins
| Plugin | Use Case | Configuration Example |
|---|---|---|
record_transformer | Field manipulation | @type record_transformer; <record>...</record> |
grep | Event filtering | @type grep; <regexp>key level; pattern ERROR</regexp> |
parser | Data parsing | @type parser; key_name message; <parse>...</parse> |
geoip | IP geolocation | @type geoip; geoip_lookup_keys remote_addr |
throttle | Rate limiting | @type throttle; group_bucket_limit 100 |
18.4 Parser Plugins
| Plugin | Use Case | Example |
|---|---|---|
json | JSON parsing | @type json; json_parser yajl |
regexp | Regex parsing | @type regexp; expression /(?<field>.*)/ |
csv | CSV parsing | @type csv; keys col1,col2,col3 |
apache2 | Apache logs | @type apache2 |
nginx | Nginx logs | @type nginx |
syslog | Syslog format | @type syslog; message_format rfc5424 |
19. Configuration Templates
19.1 Web Application Logging
# Complete web application logging setup
<system>
log_level info
workers 2
</system>
# Application logs
<source>
@type tail
path /var/log/myapp/*.log
pos_file /var/log/fluent/myapp.log.pos
tag myapp.logs
<parse>
@type json
time_key timestamp
time_format %Y-%m-%d %H:%M:%S %z
</parse>
</source>
# Nginx access logs
<source>
@type tail
path /var/log/nginx/access.log
pos_file /var/log/fluent/nginx.access.log.pos
tag nginx.access
<parse>
@type nginx
</parse>
</source>
# Nginx error logs
<source>
@type tail
path /var/log/nginx/error.log
pos_file /var/log/fluent/nginx.error.log.pos
tag nginx.error
<parse>
@type regexp
expression /^(?<time>\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) \[(?<level>\w+)\] (?<message>.*)$/
time_format %Y/%m/%d %H:%M:%S
</parse>
</source>
# Enrich all logs
<filter **>
@type record_transformer
<record>
hostname ${hostname}
environment production
service myapp
processed_at ${Time.now.utc.iso8601}
</record>
</filter>
# Filter errors for alerting
<filter *.{error,ERROR}>
@type record_transformer
<record>
alert_required true
severity high
</record>
</filter>
# Send to Elasticsearch
<match {myapp,nginx}.**>
@type elasticsearch
host elasticsearch.example.com
port 9200
index_name myapp-logs
<buffer>
@type file
path /var/log/fluent/buffer/elasticsearch
chunk_limit_size 32MB
queue_limit_length 128
flush_mode interval
flush_interval 10s
retry_type exponential_backoff
retry_forever true
</buffer>
</match>
# Archive to S3
<match **>
@type s3
s3_bucket myapp-logs-archive
path logs/year=%Y/month=%m/day=%d/hour=%H/
s3_object_key_format %{path}%{time_slice}_%{index}.%{file_extension}
<format>
@type json
</format>
<buffer time>
@type file
path /var/log/fluent/buffer/s3
timekey 3600
timekey_wait 300
chunk_limit_size 256MB
</buffer>
</match>ANSI19.2 Kubernetes Logging
# Kubernetes container logging
<source>
@type tail
path /var/log/containers/*.log
pos_file /var/log/fluent/containers.log.pos
tag kubernetes.*
read_from_head true
<parse>
@type json
time_key time
time_format %Y-%m-%dT%H:%M:%S.%NZ
</parse>
</source>
# Add Kubernetes metadata
<filter kubernetes.**>
@type kubernetes_metadata
kubernetes_url https://kubernetes.default.svc.cluster.local
bearer_token_file /var/run/secrets/kubernetes.io/serviceaccount/token
ca_file /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
annotation_match [".*"]
</filter>
# Route by namespace
<match kubernetes.var.log.containers.**>
@type rewrite_tag_filter
<rule>
key $.kubernetes.namespace_name
pattern ^(kube-system|kube-public)$
tag system.${tag}
</rule>
<rule>
key $.kubernetes.namespace_name
pattern ^(.+)$
tag app.${tag}
</rule>
</match>
# System logs to separate index
<match system.**>
@type elasticsearch
host elasticsearch.logging.svc.cluster.local
index_name k8s-system-logs
<buffer>
@type file
path /var/log/fluent/buffer/system
flush_interval 30s
</buffer>
</match>
# Application logs
<match app.**>
@type elasticsearch
host elasticsearch.logging.svc.cluster.local
index_name k8s-app-logs
<buffer>
@type file
path /var/log/fluent/buffer/app
flush_interval 10s
</buffer>
</match>ANSI19.3 Multi-tenant SaaS Logging
# Multi-tenant logging with tenant isolation
<source>
@type forward
port 24224
bind 0.0.0.0
</source>
# Extract tenant information
<filter **>
@type record_transformer
enable_ruby true
<record>
tenant_id ${
# Extract from tag or record
if tag =~ /^tenant\.([^.]+)\./
$1
elsif record.key?("tenant_id")
record["tenant_id"]
elsif record.key?("organization_id")
record["organization_id"]
else
"unknown"
end
}
# Security: ensure tenant isolation
tenant_validated ${
tenant = record["tenant_id"]
allowed_tenants = ["tenant-a", "tenant-b", "tenant-c"]
allowed_tenants.include?(tenant) ? tenant : "invalid"
}
</record>
</filter>
# Reject invalid tenants
<filter **>
@type grep
<exclude>
key tenant_validated
pattern ^invalid$
</exclude>
</filter>
# Route to tenant-specific outputs
<match **>
@type rewrite_tag_filter
<rule>
key tenant_validated
pattern ^tenant-a$
tag routed.tenant-a.${tag}
</rule>
<rule>
key tenant_validated
pattern ^tenant-b$
tag routed.tenant-b.${tag}
</rule>
<rule>
key tenant_validated
pattern ^tenant-c$
tag routed.tenant-c.${tag}
</rule>
</match>
# Tenant A - Premium tier
<match routed.tenant-a.**>
@type copy
<store>
@type elasticsearch
host tenant-a-es.example.com
index_name tenant-a-logs
<buffer>
chunk_limit_size 64MB
flush_interval 5s
</buffer>
</store>
<store>
@type s3
s3_bucket tenant-a-archive
path logs/
</store>
</match>
# Tenant B - Standard tier
<match routed.tenant-b.**>
@type elasticsearch
host shared-es.example.com
index_name tenant-b-logs
<buffer>
chunk_limit_size 32MB
flush_interval 30s
</buffer>
</match>
# Tenant C - Basic tier
<match routed.tenant-c.**>
@type file
path /var/log/fluent/tenant-c/logs.%Y%m%d.log
<format>
@type json
</format>
<buffer time>
timekey 86400
timekey_wait 3600
</buffer>
</match>ANSI19.4 High-Availability Production Setup
# Production HA configuration
<system>
workers 4
root_dir /var/log/fluent
log_level info
suppress_repeated_stacktrace true
</system>
# Secure input from forwarders
<source>
@type forward
port 24224
bind 0.0.0.0
<transport tls>
cert_path /etc/fluent/certs/server.crt
private_key_path /etc/fluent/certs/server.key
ca_cert_path /etc/fluent/certs/ca.crt
client_cert_auth true
</transport>
<security>
self_hostname #{ENV['HOSTNAME']}
shared_key "#{ENV['FLUENT_SHARED_KEY']}"
</security>
</source>
# Health check endpoint
<source>
@type http
port 8888
bind 0.0.0.0
</source>
# Monitoring endpoints
<source>
@type monitor_agent
port 24220
bind 0.0.0.0
</source>
<source>
@type prometheus
port 24231
bind 0.0.0.0
</source>
# Process all application logs
<match app.**>
@type copy
# Primary: Real-time search
<store>
@type elasticsearch
hosts ["es1.example.com:9200", "es2.example.com:9200", "es3.example.com:9200"]
scheme https
ssl_verify true
index_name app-logs
logstash_format true
logstash_prefix app-logs
<buffer>
@type file
path /var/log/fluent/buffer/elasticsearch
chunk_limit_size 32MB
queue_limit_length 256
flush_mode interval
flush_interval 10s
retry_type exponential_backoff
retry_forever true
</buffer>
<secondary>
@type relabel
@label @ES_BACKUP
</secondary>
</store>
# Secondary: Long-term storage
<store>
@type s3
s3_bucket production-logs
s3_region us-west-2
path logs/year=%Y/month=%m/day=%d/hour=%H/
s3_object_key_format %{path}%{time_slice}_%{index}.%{file_extension}
<format>
@type json
</format>
<compress>
@type gzip
</compress>
<buffer time>
@type file
path /var/log/fluent/buffer/s3
timekey 3600
timekey_wait 300
chunk_limit_size 256MB
</buffer>
</store>
# Tertiary: Metrics
<store>
@type prometheus
<metric>
name app_logs_total
type counter
desc Total application logs
<labels>
service ${record["service"]}
level ${record["level"]}
environment production
</labels>
</metric>
</store>
</match>
# Elasticsearch backup path
<label @ES_BACKUP>
<match **>
@type s3
s3_bucket emergency-logs
path elasticsearch-failures/
<format>
@type json
</format>
</match>
</label>
# Error alerts
<match error.**>
@type slack
webhook_url "#{ENV['SLACK_WEBHOOK_URL']}"
channel production-alerts
username fluentd
icon_emoji :warning:
message "Production Error: %s"
message_keys message
<buffer>
flush_mode immediate
retry_max_times 3
</buffer>
</match>
# Fallback for unmatched logs
<match **>
@type file
path /var/log/fluent/unmatched/events.%Y%m%d.log
<format>
@type json
</format>
<buffer time>
timekey 86400
</buffer>
</match>ANSIThis comprehensive guide provides practical, production-ready configurations and patterns that can be adapted to various use cases and environments. Each template includes best practices for performance, reliability, and security.
Discover more from Altgr Blog
Subscribe to get the latest posts sent to your email.
