Skip to content

Fluentd 日志收集引擎

Fluentd是一个开源的数据收集器,专为大规模日志管理设计,通过统一的日志层简化日志数据的收集、过滤和输出,在云原生环境中广泛应用于构建可靠的日志基础设施。

🎯 Fluentd 核心架构

统一日志层概念

yaml
fluentd_architecture:
  unified_logging_layer:
    concept: "统一的日志收集层"
    benefits:
      - "多数据源统一收集"
      - "统一的数据格式"
      - "灵活的路由和过滤"
      - "可靠的数据传输"
    
    components:
      event_engine: "事件驱动处理引擎"
      plugin_system: "插件化架构"
      buffer_system: "缓冲和重试机制"
      routing_engine: "标签路由系统"
  
  event_processing_model:
    event_structure:
      tag: "路由标签 (如: app.web.access)"
      time: "时间戳 (Unix时间或时间对象)"
      record: "记录数据 (JSON格式)"
      
    example_event: |
      {
        "tag": "app.web.access",
        "time": 1642234567,
        "record": {
          "method": "GET",
          "path": "/api/users",
          "status": 200,
          "response_time": 0.123,
          "user_agent": "curl/7.68.0"
        }
      }
    
    processing_flow:
      - "输入源接收数据"
      - "解析并标记事件"
      - "应用过滤器处理"
      - "路由到匹配的输出"
      - "缓冲和批量发送"
yaml
plugin_ecosystem:
  plugin_types:
    input_plugins:
      purpose: "数据源接收"
      examples:
        tail: "文件尾部跟踪"
        forward: "Fluentd协议接收"
        http: "HTTP端点接收"
        syslog: "系统日志接收"
        exec: "命令执行输出"
        
    filter_plugins:
      purpose: "数据处理和转换"
      examples:
        grep: "模式匹配过滤"
        record_transformer: "记录变换"
        parser: "数据解析"
        geoip: "地理位置解析"
        kubernetes_metadata: "K8s元数据添加"
    
    output_plugins:
      purpose: "数据输出和存储"
      examples:
        elasticsearch: "Elasticsearch集群"
        kafka: "Kafka消息队列"
        s3: "Amazon S3存储"
        forward: "转发到其他Fluentd"
        file: "本地文件存储"
        
    buffer_plugins:
      purpose: "数据缓冲和可靠性"
      types:
        memory: "内存缓冲(快速)"
        file: "文件缓冲(持久)"
        
    format_plugins:
      purpose: "数据格式化"
      examples:
        json: "JSON格式"
        csv: "CSV格式"
        ltsv: "标签分隔格式"
        msgpack: "MessagePack二进制"

配置语法和结构

ruby
# 基础配置结构
<source>
  @type tail
  @id input_tail
  @label @mainstream
  path /var/log/httpd-access.log
  pos_file /var/log/fluentd/httpd-access.log.pos
  tag apache.access
  <parse>
    @type apache2
  </parse>
</source>

<label @mainstream>
  <filter **>
    @type stdout
  </filter>
  
  <match apache.access>
    @type elasticsearch
    host elasticsearch.default.svc.cluster.local
    port 9200
    logstash_format true
    logstash_prefix apache
    <buffer>
      @type file
      path /var/log/fluentd/buffers/apache.access
      flush_mode interval
      flush_interval 1s
    </buffer>
  </match>
</label>
yaml
configuration_best_practices:
  structure_organization:
    system_config: |
      # 系统级配置
      <system>
        workers 2
        root_dir /tmp/fluentd
        log_level info
        suppress_repeated_stacktrace true
        emit_error_log_interval 30s
        suppress_config_dump
        without_source
      </system>
    
    source_separation: |
      # 按数据源分组
      <source>
        @type tail
        @id nginx_access
        @label @nginx
        # ... nginx相关配置
      </source>
      
      <source>
        @type tail
        @id application_logs
        @label @application
        # ... 应用日志配置
      </source>
    
    label_organization: |
      # 使用标签分离处理逻辑
      <label @nginx>
        <filter nginx.access>
          @type parser
          # nginx特定处理
        </filter>
        
        <match nginx.**>
          @type elasticsearch
          # nginx输出配置
        </match>
      </label>
  
  performance_configuration:
    worker_optimization: |
      <system>
        workers 4                    # CPU核心数
        worker_limit_to_one true     # 单worker限制
        default_worker_type file     # 默认worker类型
      </system>
    
    buffer_optimization: |
      <buffer>
        @type file
        path /var/log/fluentd/buffer/
        
        # 刷新策略
        flush_mode interval
        flush_interval 5s
        flush_at_shutdown true
        
        # 批次大小
        chunk_limit_size 32m
        chunk_limit_records 100000
        
        # 队列设置
        queue_limit_length 128
        
        # 重试策略
        retry_type exponential_backoff
        retry_wait 1s
        retry_max_interval 60s
        retry_timeout 1h
        
        # 溢出设置
        overflow_action drop_oldest_chunk
      </buffer>
    
    memory_optimization: |
      # 内存使用优化
      <system>
        file_permission 0644
        dir_permission 0755
        log_rotate_age 5
        log_rotate_size 52428800  # 50MB
      </system>

🔧 Kubernetes 集成部署

DaemonSet 部署模式

yaml
# fluentd-daemonset.yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd
  namespace: kube-system
  labels:
    k8s-app: fluentd-logging
    version: v1
spec:
  selector:
    matchLabels:
      k8s-app: fluentd-logging
      version: v1
  template:
    metadata:
      labels:
        k8s-app: fluentd-logging
        version: v1
    spec:
      serviceAccount: fluentd
      serviceAccountName: fluentd
      tolerations:
      - key: node-role.kubernetes.io/master
        effect: NoSchedule
      - key: node-role.kubernetes.io/control-plane
        effect: NoSchedule
      containers:
      - name: fluentd
        image: fluent/fluentd-kubernetes-daemonset:v1-debian-elasticsearch
        env:
        - name: FLUENT_ELASTICSEARCH_HOST
          value: "elasticsearch.logging.svc.cluster.local"
        - name: FLUENT_ELASTICSEARCH_PORT
          value: "9200"
        - name: FLUENT_ELASTICSEARCH_SCHEME
          value: "http"
        - name: FLUENTD_SYSTEMD_CONF
          value: disable
        - name: FLUENT_UID
          value: "0"
        
        resources:
          limits:
            memory: 512Mi
            cpu: 200m
          requests:
            memory: 256Mi
            cpu: 100m
        
        volumeMounts:
        - name: varlog
          mountPath: /var/log
        - name: varlibdockercontainers
          mountPath: /var/lib/docker/containers
          readOnly: true
        - name: fluentd-config
          mountPath: /fluentd/etc
      
      terminationGracePeriodSeconds: 30
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: varlibdockercontainers
        hostPath:
          path: /var/lib/docker/containers
      - name: fluentd-config
        configMap:
          name: fluentd-config
yaml
# fluentd-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: fluentd-config
  namespace: kube-system
data:
  fluent.conf: |
    <system>
      root_dir /tmp/fluentd-buffers/
      log_level info
    </system>

    # 输入源 - 容器日志
    <source>
      @type tail
      @id in_tail_container_logs
      path /var/log/containers/*.log
      pos_file /var/log/fluentd-containers.log.pos
      tag kubernetes.*
      read_from_head true
      <parse>
        @type json
        time_format %Y-%m-%dT%H:%M:%S.%NZ
      </parse>
    </source>

    # 过滤器 - Kubernetes元数据
    <filter kubernetes.**>
      @type kubernetes_metadata
      @id filter_kube_metadata
      kubernetes_url "#{ENV['FLUENT_FILTER_KUBERNETES_URL'] || 'https://' + ENV['KUBERNETES_SERVICE_HOST'] + ':' + ENV['KUBERNETES_SERVICE_PORT'] + '/api'}"
      verify_ssl "#{ENV['KUBERNETES_VERIFY_SSL'] || true}"
      ca_file "#{ENV['KUBERNETES_CA_FILE']}"
      skip_labels "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_LABELS'] || 'false'}"
      skip_container_metadata "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_CONTAINER_METADATA'] || 'false'}"
      skip_master_url "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_MASTER_URL'] || 'false'}"
      skip_namespace_metadata "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_NAMESPACE_METADATA'] || 'false'}"
    </filter>

    # 过滤器 - 日志解析
    <filter kubernetes.var.log.containers.**.log>
      @type parser
      @id filter_parser
      key_name log
      reserve_data true
      remove_key_name_field true
      <parse>
        @type multi_format
        <pattern>
          format json
        </pattern>
        <pattern>
          format none
        </pattern>
      </parse>
    </filter>

    # 输出 - Elasticsearch
    <match kubernetes.**>
      @type elasticsearch
      @id out_es
      @log_level info
      include_tag_key true
      host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
      port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
      scheme "#{ENV['FLUENT_ELASTICSEARCH_SCHEME'] || 'http'}"
      ssl_verify "#{ENV['FLUENT_ELASTICSEARCH_SSL_VERIFY'] || 'true'}"
      ssl_version "#{ENV['FLUENT_ELASTICSEARCH_SSL_VERSION'] || 'TLSv1_2'}"
      reload_connections "#{ENV['FLUENT_ELASTICSEARCH_RELOAD_CONNECTIONS'] || 'false'}"
      reconnect_on_error "#{ENV['FLUENT_ELASTICSEARCH_RECONNECT_ON_ERROR'] || 'true'}"
      reload_on_failure "#{ENV['FLUENT_ELASTICSEARCH_RELOAD_ON_FAILURE'] || 'true'}"
      log_es_400_reason "#{ENV['FLUENT_ELASTICSEARCH_LOG_ES_400_REASON'] || 'false'}"
      logstash_prefix "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_PREFIX'] || 'logstash'}"
      logstash_format "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_FORMAT'] || 'true'}"
      index_name "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_INDEX_NAME'] || 'logstash'}"
      type_name "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_TYPE_NAME'] || 'fluentd'}"
      
      <buffer>
        flush_mode interval
        retry_type exponential_backoff
        flush_thread_count 2
        flush_interval 5s
        retry_forever
        retry_max_interval 30
        chunk_limit_size 2M
        queue_limit_length 8
        overflow_action drop_oldest_chunk
      </buffer>
    </match>
yaml
# fluentd-rbac.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: fluentd
  namespace: kube-system

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: fluentd
rules:
- apiGroups:
  - ""
  resources:
  - pods
  - namespaces
  verbs:
  - get
  - list
  - watch

---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: fluentd
roleRef:
  kind: ClusterRole
  name: fluentd
  apiGroup: rbac.authorization.k8s.io
subjects:
- kind: ServiceAccount
  name: fluentd
  namespace: kube-system

Sidecar 部署模式

yaml
# sidecar-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: webapp-with-logging
spec:
  replicas: 3
  selector:
    matchLabels:
      app: webapp
  template:
    metadata:
      labels:
        app: webapp
    spec:
      containers:
      # 主应用容器
      - name: webapp
        image: nginx:1.21
        ports:
        - containerPort: 80
        volumeMounts:
        - name: shared-logs
          mountPath: /var/log/nginx
        
      # Fluentd sidecar容器
      - name: fluentd
        image: fluent/fluentd:v1.14-debian-1
        volumeMounts:
        - name: shared-logs
          mountPath: /var/log/nginx
          readOnly: true
        - name: fluentd-config
          mountPath: /fluentd/etc
        env:
        - name: FLUENTD_CONF
          value: fluent.conf
      
      volumes:
      - name: shared-logs
        emptyDir: {}
      - name: fluentd-config
        configMap:
          name: webapp-fluentd-config

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: webapp-fluentd-config
data:
  fluent.conf: |
    <source>
      @type tail
      path /var/log/nginx/access.log
      pos_file /var/log/fluentd/nginx-access.log.pos
      tag webapp.nginx.access
      <parse>
        @type nginx
      </parse>
    </source>
    
    <source>
      @type tail
      path /var/log/nginx/error.log
      pos_file /var/log/fluentd/nginx-error.log.pos
      tag webapp.nginx.error
      <parse>
        @type multiline
        format_firstline /\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}/
        format1 /^(?<timestamp>\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) \[(?<log_level>\w+)\] (?<message>.*)/
      </parse>
    </source>
    
    <match webapp.**>
      @type forward
      <server>
        host fluentd-aggregator.logging.svc.cluster.local
        port 24224
      </server>
      <buffer>
        @type file
        path /var/log/fluentd/buffer
        flush_mode interval
        flush_interval 3s
      </buffer>
    </match>
yaml
# fluentd-aggregator.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: fluentd-aggregator
  namespace: logging
spec:
  replicas: 2
  selector:
    matchLabels:
      app: fluentd-aggregator
  template:
    metadata:
      labels:
        app: fluentd-aggregator
    spec:
      containers:
      - name: fluentd
        image: fluent/fluentd:v1.14-debian-1
        ports:
        - containerPort: 24224
          name: forward
        - containerPort: 9880
          name: http
        volumeMounts:
        - name: config
          mountPath: /fluentd/etc
        - name: buffer
          mountPath: /var/log/fluentd
        env:
        - name: FLUENTD_CONF
          value: fluent.conf
        
        resources:
          requests:
            memory: 512Mi
            cpu: 200m
          limits:
            memory: 1Gi
            cpu: 500m
      
      volumes:
      - name: config
        configMap:
          name: fluentd-aggregator-config
      - name: buffer
        persistentVolumeClaim:
          claimName: fluentd-buffer-pvc

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: fluentd-aggregator-config
  namespace: logging
data:
  fluent.conf: |
    <system>
      workers 2
      root_dir /var/log/fluentd
    </system>
    
    # 接收来自sidecar的日志
    <source>
      @type forward
      @id forward_input
      port 24224
      bind 0.0.0.0
    </source>
    
    # 健康检查端点
    <source>
      @type http
      @id http_input
      port 9880
      bind 0.0.0.0
    </source>
    
    # 过滤和处理
    <filter webapp.**>
      @type record_transformer
      <record>
        cluster "#{ENV['CLUSTER_NAME'] || 'default'}"
        datacenter "#{ENV['DATACENTER'] || 'unknown'}"
      </record>
    </filter>
    
    # 输出到多个目标
    <match webapp.nginx.access>
      @type copy
      <store>
        @type elasticsearch
        host elasticsearch.logging.svc.cluster.local
        port 9200
        index_name webapp-access-%Y.%m.%d
        type_name access_log
        
        <buffer time>
          timekey 3600  # 1小时
          timekey_wait 60
          timekey_use_utc true
        </buffer>
      </store>
      
      <store>
        @type s3
        aws_key_id "#{ENV['AWS_ACCESS_KEY_ID']}"
        aws_sec_key "#{ENV['AWS_SECRET_ACCESS_KEY']}"
        s3_bucket logs-backup
        s3_region us-west-2
        path webapp/access_logs/
        
        <buffer time>
          timekey 86400  # 1天
          chunk_limit_size 256m
        </buffer>
        
        <format>
          @type json
        </format>
      </store>
    </match>
    
    <match webapp.nginx.error>
      @type elasticsearch
      host elasticsearch.logging.svc.cluster.local
      port 9200
      index_name webapp-errors-%Y.%m.%d
      type_name error_log
      
      <buffer time>
        timekey 3600
        timekey_wait 60
      </buffer>
    </match>

🚀 高级配置和优化

数据处理和路由

ruby
# 多格式日志解析
<source>
  @type tail
  path /var/log/application/*.log
  pos_file /var/log/fluentd/application.log.pos
  tag app.mixed
  <parse>
    @type multi_format
    <pattern>
      format json
      time_key timestamp
      time_format %Y-%m-%dT%H:%M:%S.%L%z
    </pattern>
    <pattern>
      format /^(?<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}.\d{3}) \[(?<level>\w+)\] (?<logger>\S+) - (?<message>.*)$/
      time_format %Y-%m-%d %H:%M:%S.%L
    </pattern>
    <pattern>
      format none
    </pattern>
  </parse>
</source>

# 结构化数据转换
<filter app.mixed>
  @type record_transformer
  enable_ruby true
  <record>
    # 添加主机信息
    hostname "#{Socket.gethostname}"
    
    # 解析HTTP状态码类别
    status_category ${record["status_code"] ? (record["status_code"].to_i / 100) * 100 : "unknown"}
    
    # 计算响应时间类别
    response_time_bucket ${
      case record["response_time"].to_f
      when 0...0.1
        "fast"
      when 0.1...0.5
        "normal"
      when 0.5...2.0
        "slow"
      else
        "very_slow"
      end
    }
    
    # 提取用户ID
    user_id ${record["user_agent"] =~ /user_id=(\d+)/ ? $1 : "anonymous"}
  </record>
</filter>

# 敏感数据处理
<filter app.**>
  @type grep
  <exclude>
    key message
    pattern /password|secret|token|key/i
  </exclude>
</filter>

<filter app.**>
  @type record_transformer
  enable_ruby true
  <record>
    # 脱敏处理
    message ${record["message"].gsub(/\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b/, "****-****-****-****")}  # 信用卡号
    message ${record["message"].gsub(/\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b/, "[EMAIL]")}  # 邮箱地址
  </record>
</filter>
ruby
# 基于内容的智能路由
<match app.mixed>
  @type rewrite_tag_filter
  <rule>
    key level
    pattern /^ERROR$/
    tag app.error
  </rule>
  <rule>
    key level
    pattern /^WARN$/
    tag app.warn
  </rule>
  <rule>
    key logger
    pattern /security/
    tag app.security
  </rule>
  <rule>
    key message
    pattern /payment|transaction|billing/i
    tag app.payment
  </rule>
  <rule>
    key level
    pattern /.*/
    tag app.normal
  </rule>
</match>

# 错误日志处理
<match app.error>
  @type copy
  <store>
    @type elasticsearch
    host elasticsearch-errors.logging.svc.cluster.local
    port 9200
    index_name errors-%Y.%m.%d
    type_name error_log
    
    <buffer time>
      timekey 3600
      timekey_wait 60
    </buffer>
  </store>
  
  <store>
    @type slack
    webhook_url https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK
    channel "#alerts"
    username "Fluentd"
    title "Application Error Alert"
    message "Error in %s: %s"
    message_keys hostname,message
    
    <buffer>
      flush_mode immediate
    </buffer>
  </store>
  
  <store>
    @type file
    path /var/log/fluentd/errors/error.%Y%m%d.log
    append true
    
    <buffer time>
      timekey 86400  # 1天
    </buffer>
    
    <format>
      @type json
    </format>
  </store>
</match>

# 安全日志特殊处理
<match app.security>
  @type elasticsearch
  host elasticsearch-security.logging.svc.cluster.local
  port 9200
  index_name security-%Y.%m.%d
  type_name security_log
  
  <buffer time>
    timekey 3600
    timekey_wait 60
  </buffer>
</match>

# 支付相关日志
<match app.payment>
  @type copy
  <store>
    @type elasticsearch
    host elasticsearch-payment.logging.svc.cluster.local
    port 9200
    index_name payment-%Y.%m.%d
    type_name payment_log
    
    <buffer time>
      timekey 3600
      timekey_wait 60
    </buffer>
  </store>
  
  # 备份到S3用于合规
  <store>
    @type s3
    aws_key_id "#{ENV['AWS_ACCESS_KEY_ID']}"
    aws_sec_key "#{ENV['AWS_SECRET_ACCESS_KEY']}"
    s3_bucket compliance-logs
    s3_region us-west-2
    path payment/%Y/%m/%d/
    
    <buffer time>
      timekey 86400
      chunk_limit_size 100m
    </buffer>
    
    <format>
      @type json
    </format>
  </store>
</match>

# 一般日志处理
<match app.normal>
  @type elasticsearch
  host elasticsearch.logging.svc.cluster.local
  port 9200
  logstash_format true
  logstash_prefix application
  
  <buffer time>
    timekey 3600
    timekey_wait 60
    chunk_limit_size 64m
    queue_limit_length 128
  </buffer>
</match>

性能优化策略

yaml
performance_optimization:
  system_configuration:
    worker_optimization: |
      <system>
        workers 4                    # 基于CPU核心数
        worker_limit_to_one true     # 防止资源竞争
        enable_msgpack_time_support true
        
        # 日志配置
        log_level warn              # 减少日志输出
        suppress_repeated_stacktrace true
        emit_error_log_interval 30s
        suppress_config_dump true
      </system>
    
    memory_management:
      gc_tuning: |
        # Ruby GC优化环境变量
        RUBY_GC_HEAP_INIT_SLOTS: "1000000"
        RUBY_GC_HEAP_FREE_SLOTS: "500000"
        RUBY_GC_HEAP_GROWTH_FACTOR: "1.2"
        RUBY_GC_HEAP_GROWTH_MAX_SLOTS: "300000"
        RUBY_GC_MALLOC_LIMIT: "90000000"
        RUBY_GC_MALLOC_LIMIT_MAX: "180000000"
        RUBY_GC_OLDMALLOC_LIMIT: "90000000"
        RUBY_GC_OLDMALLOC_LIMIT_MAX: "180000000"
      
      buffer_optimization: |
        <buffer>
          @type file
          path /var/log/fluentd/buffer/
          
          # 刷新优化
          flush_mode interval
          flush_interval 5s
          flush_at_shutdown true
          
          # 批次优化
          chunk_limit_size 32m
          chunk_limit_records 100000
          
          # 队列优化
          queue_limit_length 256
          
          # 重试优化
          retry_type exponential_backoff
          retry_wait 1s
          retry_max_interval 300s
          retry_timeout 72h
          retry_forever false
          
          # 溢出处理
          overflow_action drop_oldest_chunk
        </buffer>
  
  plugin_optimization:
    input_optimization:
      tail_plugin: |
        <source>
          @type tail
          path /var/log/app/*.log
          pos_file /var/log/fluentd/app.log.pos
          
          # 性能优化
          read_from_head false
          read_lines_limit 1000
          multiline_flush_interval 5s
          path_timezone "+09:00"
          
          # 文件处理优化
          ignore_repeated_permission_error true
          refresh_interval 60
          stat_interval 1
          
          <parse>
            @type json
            # 解析优化
            keep_time_key true
            time_type string
          </parse>
        </source>
    
    filter_optimization:
      efficient_filtering: |
        # 使用grep进行预过滤
        <filter app.**>
          @type grep
          <regexp>
            key level
            pattern ^(ERROR|WARN|INFO)$
          </regexp>
        </filter>
        
        # 条件处理减少CPU使用
        <filter app.**>
          @type record_transformer
          enable_ruby false  # 禁用Ruby以提高性能
          <record>
            environment "#{ENV['ENVIRONMENT'] || 'unknown'}"
            datacenter "#{ENV['DATACENTER'] || 'unknown'}"
          </record>
        </filter>
    
    output_optimization:
      elasticsearch_tuning: |
        <match **>
          @type elasticsearch
          host elasticsearch.logging.svc.cluster.local
          port 9200
          
          # 连接优化
          reload_connections false
          reconnect_on_error true
          reload_on_failure true
          
          # 批量优化
          bulk_message_request_threshold 20971520  # 20MB
          
          # 模板管理
          template_name fluentd
          template_overwrite false
          
          <buffer>
            # 性能配置
            flush_mode interval
            flush_interval 5s
            flush_thread_count 4
            
            chunk_limit_size 32m
            chunk_limit_records 50000
            queue_limit_length 512
            
            # 重试配置
            retry_type exponential_backoff
            retry_wait 2s
            retry_max_interval 300s
            retry_forever false
          </buffer>
        </match>
yaml
monitoring_configuration:
  metrics_collection:
    prometheus_integration: |
      # 添加监控插件
      <source>
        @type prometheus
        bind 0.0.0.0
        port 24231
        metrics_path /metrics
      </source>
      
      <source>
        @type prometheus_monitor
        <labels>
          host #{hostname}
        </labels>
      </source>
      
      <source>
        @type prometheus_output_monitor
        <labels>
          host #{hostname}
        </labels>
      </source>
    
    health_checks:
      http_endpoint: |
        <source>
          @type http
          port 9880
          bind 0.0.0.0
          
          # 健康检查响应
          <transport tls>
            ca_path /etc/ssl/certs/ca.pem
            cert_path /etc/ssl/certs/server.pem
            private_key_path /etc/ssl/private/server.key
            client_cert_auth false
          </transport>
        </source>
      
      liveness_probe: |
        # Kubernetes liveness probe
        livenessProbe:
          httpGet:
            path: /fluentd.healthcheck?json=%7B%22ping%22%3A+%22pong%22%7D
            port: 9880
          initialDelaySeconds: 5
          periodSeconds: 5
        
        readinessProbe:
          httpGet:
            path: /fluentd.healthcheck?json=%7B%22ping%22%3A+%22pong%22%7D
            port: 9880
          initialDelaySeconds: 10
          periodSeconds: 5
  
  debugging_tools:
    trace_logging: |
      # 启用详细日志
      <system>
        log_level trace
        suppress_repeated_stacktrace false
      </system>
      
      # 添加调试输出
      <match debug.**>
        @type stdout
        <format>
          @type json
        </format>
      </match>
    
    performance_profiling: |
      # 性能分析
      <source>
        @type debug_agent
        bind 127.0.0.1
        port 24230
      </source>
      
      # 慢查询日志
      <filter **>
        @type elapsed_time
        tag elapsed
      </filter>

📋 Fluentd 面试重点

基础概念类

  1. Fluentd的核心设计理念是什么?

    • 统一日志层(Unified Logging Layer)
    • 事件驱动架构
    • 插件化设计
    • 可靠的数据传输
  2. Fluentd与Logstash的主要区别?

    • 内存使用:Fluentd更轻量
    • 插件生态:各有特色
    • 配置语法:不同的DSL
    • 性能特点:适用场景差异
  3. Fluentd的事件模型包含哪些元素?

    • Tag:路由标签
    • Time:时间戳
    • Record:记录数据
    • 事件流转过程

Kubernetes集成类

  1. 在Kubernetes中部署Fluentd的模式?

    • DaemonSet模式:节点级收集
    • Sidecar模式:应用级收集
    • Aggregator模式:集中处理
    • 各种模式的适用场景
  2. 如何处理Kubernetes容器日志?

    • 容器日志路径
    • 元数据过滤器配置
    • Pod和Container信息关联
    • 多行日志处理
  3. Fluentd的权限和安全配置?

    • RBAC权限设置
    • ServiceAccount配置
    • 敏感数据处理
    • 传输加密

高级配置类

  1. 如何实现复杂的日志路由?

    • rewrite_tag_filter插件
    • 条件匹配和标签重写
    • 多输出配置
    • 路由性能优化
  2. Fluentd的缓冲和重试机制?

    • Buffer插件类型
    • 刷新策略配置
    • 重试算法选择
    • 溢出处理方案
  3. 大规模环境下的性能优化?

    • 多Worker配置
    • 内存和GC优化
    • Buffer调优
    • 监控和诊断

🔗 相关内容


Fluentd作为云原生环境中广泛采用的日志收集器,以其统一日志层的设计理念和灵活的插件架构,为构建可靠的日志基础设施提供了强有力的支撑。

正在精进