Logstash 数据处理流水线
Logstash是一个强大的数据处理引擎,通过Input、Filter、Output三阶段流水线模式,实现数据的收集、转换、增强和路由,是ELK技术栈中的核心数据处理组件。
🏗️ Logstash 架构原理
流水线处理模型
yaml
logstash_architecture:
input_stage:
purpose: "数据摄取和接收"
characteristics:
- "多数据源支持"
- "并发数据收集"
- "背压管理"
- "故障恢复"
common_inputs:
beats: "从Filebeat、Metricbeat等接收"
file: "监控文件变化"
http: "接收HTTP请求"
tcp_udp: "网络协议监听"
kafka: "消息队列消费"
redis: "缓存队列读取"
jdbc: "数据库查询"
configuration_pattern: |
input {
beats {
port => 5044
host => "0.0.0.0"
}
file {
path => "/var/log/apache/*.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter_stage:
purpose: "数据解析、转换和增强"
capabilities:
- "结构化解析"
- "字段提取和映射"
- "数据类型转换"
- "内容增强和过滤"
core_filters:
grok: "正则表达式模式匹配"
mutate: "字段操作和转换"
date: "时间戳解析"
json: "JSON数据解析"
csv: "CSV格式解析"
geoip: "地理位置解析"
useragent: "用户代理解析"
processing_pattern: |
filter {
# 解析Apache访问日志
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
# 解析时间戳
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
}
# 地理位置解析
geoip {
source => "clientip"
target => "geoip"
}
}
output_stage:
purpose: "数据路由和输出"
features:
- "多目标输出"
- "条件路由"
- "批量操作"
- "错误处理"
common_outputs:
elasticsearch: "索引到Elasticsearch"
kafka: "发布到消息队列"
file: "写入文件系统"
http: "发送HTTP请求"
email: "邮件通知"
stdout: "标准输出调试"
routing_pattern: |
output {
if [level] == "ERROR" {
email {
to => "admin@company.com"
subject => "Application Error"
body => "Error: %{message}"
}
}
elasticsearch {
hosts => ["es1:9200", "es2:9200"]
index => "logs-%{+YYYY.MM.dd}"
template_name => "logs"
}
}yaml
execution_model:
pipeline_workers:
concept: "并行处理管道"
configuration:
pipeline.workers: "CPU核心数"
pipeline.batch.size: 125
pipeline.batch.delay: 50
work_distribution:
- "每个worker独立处理事件"
- "输入阶段共享队列"
- "过滤阶段并行执行"
- "输出阶段批量发送"
memory_management:
event_queue:
type: "内存队列"
size: "pipeline.batch.size * pipeline.workers * 2"
overflow: "背压机制,阻塞输入"
persistent_queue:
purpose: "磁盘持久化队列"
benefits:
- "数据持久化保证"
- "重启后数据恢复"
- "背压缓解"
- "批量优化"
configuration: |
queue.type: persisted
queue.page_capacity: 64mb
queue.max_events: 0 # 无限制
queue.max_bytes: 1024mb
performance_characteristics:
throughput_factors:
- "输入数据速率"
- "过滤器复杂度"
- "输出目标性能"
- "系统资源限制"
latency_components:
- "数据解析时间"
- "过滤器处理时间"
- "队列等待时间"
- "输出写入时间"
bottleneck_identification:
input_bottleneck: "输入速率无法跟上数据产生"
filter_bottleneck: "复杂处理逻辑耗时"
output_bottleneck: "目标系统写入延迟"
system_bottleneck: "CPU、内存、IO限制"多管道架构
yaml
multi_pipeline_architecture:
pipeline_separation:
by_data_source:
apache_pipeline:
config: "pipelines/apache.conf"
purpose: "处理Apache访问日志"
workers: 4
application_pipeline:
config: "pipelines/application.conf"
purpose: "处理应用程序日志"
workers: 8
by_processing_complexity:
simple_pipeline:
purpose: "简单结构化日志"
workers: 2
optimization: "高吞吐量配置"
complex_pipeline:
purpose: "复杂非结构化日志"
workers: 4
optimization: "处理能力优化"
by_output_destination:
elasticsearch_pipeline:
output: "Elasticsearch集群"
batch_size: 500
kafka_pipeline:
output: "Kafka消息队列"
batch_size: 1000
pipeline_configuration: |
# pipelines.yml
- pipeline.id: apache-logs
path.config: "/etc/logstash/pipelines/apache.conf"
pipeline.workers: 4
pipeline.batch.size: 250
- pipeline.id: app-logs
path.config: "/etc/logstash/pipelines/application.conf"
pipeline.workers: 8
pipeline.batch.size: 125
queue.type: persisted
- pipeline.id: metrics
path.config: "/etc/logstash/pipelines/metrics.conf"
pipeline.workers: 2
pipeline.batch.size: 1000
cross_pipeline_communication:
pipeline_to_pipeline: |
# 发送管道
output {
pipeline {
send_to => "enrichment-pipeline"
}
}
# 接收管道
input {
pipeline {
address => "enrichment-pipeline"
}
}
use_cases:
- "数据预处理分离"
- "共享增强逻辑"
- "多阶段处理"
- "错误处理隔离"yaml
pipeline_management:
monitoring_metrics:
throughput_metrics:
- "events.in (输入事件数)"
- "events.out (输出事件数)"
- "events.filtered (过滤事件数)"
- "events.duration_in_millis (处理耗时)"
performance_metrics:
- "pipeline.workers (工作线程数)"
- "pipeline.batch_size (批次大小)"
- "queue.events (队列事件数)"
- "queue.size_in_bytes (队列大小)"
error_metrics:
- "events.failed (失败事件数)"
- "pipeline.reloads.failures (重载失败)"
- "dead_letter_queue.dropped_events (丢弃事件)"
hot_reload:
automatic_reload:
configuration: "config.reload.automatic: true"
interval: "config.reload.interval: 3s"
manual_reload:
api_endpoint: "PUT /_node/pipeline/apache-logs"
cli_command: "systemctl reload logstash"
reload_behavior:
- "不中断现有事件处理"
- "新配置对新事件生效"
- "渐进式切换"
- "配置验证"
pipeline_scaling:
vertical_scaling:
worker_adjustment: "增加pipeline.workers"
memory_tuning: "调整JVM heap大小"
batch_optimization: "优化批次处理"
horizontal_scaling:
load_balancing: "多Logstash实例"
data_partitioning: "按数据源分区"
geographic_distribution: "地理位置分布"🔧 核心插件深度解析
Input 插件详解
yaml
beats_input:
basic_configuration: |
input {
beats {
port => 5044
host => "0.0.0.0"
client_inactivity_timeout => 60
include_codec_tag => false
}
}
advanced_features:
ssl_configuration: |
input {
beats {
port => 5044
ssl => true
ssl_certificate => "/path/to/cert.crt"
ssl_key => "/path/to/private.key"
ssl_verify_mode => "peer"
ssl_certificate_authorities => ["/path/to/ca.crt"]
}
}
multicast_support:
configuration: |
input {
beats {
port => 5044
congestion_threshold => 40
target_field_for_codec => "message"
}
}
performance_tuning:
throughput_optimization:
- "增加client_inactivity_timeout"
- "调整congestion_threshold"
- "优化网络缓冲区"
- "使用SSL加速"
integration_patterns:
filebeat_integration: |
# Filebeat配置
output.logstash:
hosts: ["logstash1:5044", "logstash2:5044"]
loadbalance: true
compression_level: 3
# Logstash配置
input {
beats {
port => 5044
}
}
metricbeat_integration: |
# Metricbeat配置
output.logstash:
hosts: ["logstash:5044"]
# Logstash处理
filter {
if [agent][type] == "metricbeat" {
# 指标特定处理
}
}yaml
file_input:
basic_monitoring: |
input {
file {
path => ["/var/log/*.log", "/var/log/*/*.log"]
start_position => "beginning"
sincedb_path => "/var/lib/logstash/sincedb"
stat_interval => 1
discover_interval => 15
}
}
advanced_features:
multiline_handling: |
input {
file {
path => "/var/log/application.log"
codec => multiline {
pattern => "^%{TIMESTAMP_ISO8601}"
negate => true
what => "previous"
}
}
}
file_rotation_handling: |
input {
file {
path => "/var/log/app/*.log"
# 文件轮转处理
ignore_older => 86400 # 忽略1天前的文件
close_older => 3600 # 1小时后关闭文件句柄
max_open_files => 4096 # 最大打开文件数
}
}
exclude_patterns: |
input {
file {
path => "/var/log/**/*.log"
exclude => "*.gz" # 排除压缩文件
# 排除特定模式
exclude => ["*debug*", "*temp*"]
}
}
performance_considerations:
large_file_handling:
- "设置合理的stat_interval"
- "使用exclude减少文件扫描"
- "优化sincedb路径位置"
- "控制max_open_files"
network_storage:
- "避免监控网络文件系统"
- "本地缓存机制"
- "文件锁定问题"yaml
other_inputs:
http_input: |
input {
http {
host => "0.0.0.0"
port => 8080
user => "logstash"
password => "secret"
# SSL配置
ssl => true
ssl_certificate => "/path/to/cert.pem"
ssl_key => "/path/to/key.pem"
# 请求处理
additional_codecs => {"application/json" => "json"}
response_headers => {
"Access-Control-Allow-Origin" => "*"
"Content-Type" => "application/json"
}
}
}
kafka_input: |
input {
kafka {
bootstrap_servers => "kafka1:9092,kafka2:9092,kafka3:9092"
topics => ["logs", "metrics", "events"]
group_id => "logstash-consumers"
# 性能配置
consumer_threads => 4
fetch_min_bytes => 1024
max_poll_records => 500
# 安全配置
security_protocol => "SASL_SSL"
sasl_mechanism => "PLAIN"
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='user' password='password';"
}
}
jdbc_input: |
input {
jdbc {
jdbc_driver_library => "/path/to/mysql-connector.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://db:3306/logs"
jdbc_user => "logstash"
jdbc_password => "password"
# 查询配置
statement => "SELECT * FROM application_logs WHERE updated_at > :sql_last_value"
schedule => "*/5 * * * *" # 每5分钟执行
use_column_value => true
tracking_column => "updated_at"
tracking_column_type => "timestamp"
}
}Filter 插件详解
yaml
grok_filter:
pattern_matching: |
filter {
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
# 自定义模式
patterns_dir => ["/etc/logstash/patterns"]
# 失败处理
tag_on_failure => ["_grokparsefailure"]
# 性能优化
timeout_millis => 30000
}
}
custom_patterns: |
# patterns/custom_patterns
CUSTOM_TIMESTAMP %{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{TIME}
APPLICATION_LOG %{CUSTOM_TIMESTAMP:timestamp} \[%{LOGLEVEL:level}\] %{GREEDYDATA:message}
# 使用自定义模式
filter {
grok {
patterns_dir => ["/etc/logstash/patterns"]
match => { "message" => "%{APPLICATION_LOG}" }
}
}
complex_parsing: |
filter {
grok {
# 多模式匹配
match => {
"message" => [
"%{COMBINEDAPACHELOG}",
"%{COMMONAPACHELOG}",
"%{GREEDYDATA:unparsed}"
]
}
# 条件匹配
break_on_match => true
# 字段重命名
named_captures_only => true
}
}
performance_optimization:
pattern_efficiency:
- "使用最具体的模式"
- "避免贪婪匹配"
- "预编译模式"
- "设置合理超时"
caching_strategies:
- "模式编译缓存"
- "结果缓存"
- "预热常用模式"yaml
mutate_filter:
field_operations: |
filter {
mutate {
# 添加字段
add_field => {
"environment" => "production"
"datacenter" => "us-west-1"
}
# 字段重命名
rename => {
"old_field" => "new_field"
"timestamp" => "@timestamp"
}
# 字段复制
copy => { "message" => "message_backup" }
# 字段删除
remove_field => ["temp_field", "debug_info"]
# 字段更新
replace => { "message" => "Processed: %{message}" }
# 条件更新
update => { "status" => "processed" }
}
}
data_type_conversion: |
filter {
mutate {
# 类型转换
convert => {
"response_code" => "integer"
"response_time" => "float"
"timestamp" => "string"
"tags" => "array"
}
# 字符串操作
lowercase => ["username", "email"]
uppercase => ["country_code"]
capitalize => ["user_name"]
# 文本处理
strip => ["message"]
gsub => [
"message", "\n", " ", # 替换换行符
"phone", "[^\d]", "" # 只保留数字
]
# 分割字段
split => { "tags" => "," }
join => { "errors" => "; " }
}
}
conditional_processing: |
filter {
if [log_level] == "ERROR" {
mutate {
add_tag => ["error", "high_priority"]
add_field => { "alert_required" => "true" }
}
} else if [log_level] == "WARN" {
mutate {
add_tag => ["warning"]
}
}
# 字段存在性检查
if [user_id] {
mutate {
add_field => { "has_user_context" => "true" }
}
}
}yaml
advanced_filters:
date_parsing: |
filter {
date {
match => [
"timestamp",
"dd/MMM/yyyy:HH:mm:ss Z",
"yyyy-MM-dd HH:mm:ss",
"ISO8601"
]
# 时区处理
timezone => "UTC"
target => "@timestamp"
# 失败处理
tag_on_failure => ["_dateparsefailure"]
}
}
geoip_enrichment: |
filter {
geoip {
source => "client_ip"
target => "geoip"
# 数据库配置
database => "/etc/logstash/GeoLite2-City.mmdb"
# 字段选择
fields => ["city_name", "country_name", "latitude", "longitude"]
# 缓存配置
cache_size => 1000
}
}
user_agent_parsing: |
filter {
useragent {
source => "user_agent"
target => "ua"
# 解析字段
fields => ["name", "version", "os", "os_version", "device"]
}
}
json_processing: |
filter {
json {
source => "message"
target => "parsed"
# 错误处理
tag_on_failure => ["_jsonparsefailure"]
# 跳过已解析
skip_on_invalid_json => true
}
# 提取嵌套字段
if [parsed][nested][field] {
mutate {
add_field => { "extracted_value" => "%{[parsed][nested][field]}" }
}
}
}
ruby_scripting: |
filter {
ruby {
code => '
# 自定义处理逻辑
timestamp = event.get("@timestamp")
hour = timestamp.hour
if hour >= 9 && hour <= 17
event.set("business_hours", true)
else
event.set("business_hours", false)
end
# 复杂计算
if event.get("response_time")
response_time = event.get("response_time").to_f
if response_time > 1000
event.set("slow_request", true)
event.tag("performance_issue")
end
end
'
}
}Output 插件详解
Output插件配置
yaml
output_plugins:
elasticsearch_output:
basic_configuration: |
output {
elasticsearch {
hosts => ["es1:9200", "es2:9200", "es3:9200"]
index => "logs-%{+YYYY.MM.dd}"
# 文档配置
document_type => "_doc"
document_id => "%{[@metadata][fingerprint]}"
# 模板管理
template_name => "logs"
template => "/etc/logstash/templates/logs.json"
template_overwrite => true
}
}
performance_optimization: |
output {
elasticsearch {
hosts => ["es1:9200", "es2:9200", "es3:9200"]
# 批量配置
flush_size => 500
idle_flush_time => 1
# 并发配置
workers => 2
# 重试配置
retry_max_interval => 64
retry_initial_interval => 2
# 健康检查
healthcheck_interval => 60
resurrect_delay => 5
# 连接池
pool_max => 1000
pool_max_per_route => 100
}
}
conditional_routing: |
output {
if [log_level] == "ERROR" {
elasticsearch {
hosts => ["es-errors:9200"]
index => "errors-%{+YYYY.MM.dd}"
}
} else {
elasticsearch {
hosts => ["es-main:9200"]
index => "logs-%{+YYYY.MM.dd}"
}
}
}
kafka_output: |
output {
kafka {
bootstrap_servers => "kafka1:9092,kafka2:9092"
topic_id => "processed-logs"
# 分区策略
partition_assignment_strategy => "range"
# 性能配置
batch_size => 16384
linger_ms => 5
compression_type => "snappy"
# 可靠性配置
acks => "all"
retries => 3
max_in_flight_requests_per_connection => 1
# 消息格式
codec => json
key_serializer => "org.apache.kafka.common.serialization.StringSerializer"
value_serializer => "org.apache.kafka.common.serialization.StringSerializer"
}
}
file_output: |
output {
file {
path => "/var/log/processed/logstash-%{+YYYY-MM-dd}.log"
# 文件轮转
codec => line { format => "%{@timestamp} %{level} %{message}" }
# 文件管理
file_extend_every_kb => 10000
flush_interval => 0 # 立即刷新
gzip => true # 压缩输出
}
}
http_output: |
output {
http {
url => "https://api.external-service.com/logs"
http_method => "post"
# 认证配置
headers => {
"Authorization" => "Bearer %{api_token}"
"Content-Type" => "application/json"
}
# 批量发送
batch => true
batch_events => 50
batch_timeout => 5
# 重试配置
retry_non_idempotent => true
retryable_codes => [429, 502, 503, 504]
retry_failed => true
# 响应处理
mapping => {
"timestamp" => "%{@timestamp}"
"level" => "%{level}"
"message" => "%{message}"
}
}
}
conditional_outputs: |
output {
# 基于标签路由
if "error" in [tags] {
elasticsearch {
hosts => ["es-errors:9200"]
index => "errors-%{+YYYY.MM.dd}"
}
email {
to => "alerts@company.com"
subject => "Error Alert: %{service}"
body => "Error occurred: %{message}"
}
}
# 基于字段值路由
if [service] == "payment" {
kafka {
topic_id => "payment-logs"
bootstrap_servers => "kafka:9092"
}
}
# 默认输出
elasticsearch {
hosts => ["es-main:9200"]
index => "logs-%{+YYYY.MM.dd}"
}
}
monitoring_outputs:
metrics_output: |
output {
statsd {
host => "statsd.monitoring.local"
port => 8125
# 指标定义
gauge => {
"logstash.events.processed" => "1"
"logstash.pipeline.%{[@metadata][pipeline]}.events" => "1"
}
increment => [
"logstash.events.total",
"logstash.pipeline.%{[@metadata][pipeline]}.total"
]
timing => {
"logstash.processing.duration" => "%{[@metadata][processing_time]}"
}
}
}
dead_letter_queue: |
# 自动处理失败事件
output {
if "_failed" in [tags] {
file {
path => "/var/log/logstash/failed-events-%{+YYYY-MM-dd}.log"
codec => json
}
}
}📊 性能优化和监控
性能调优策略
yaml
performance_tuning:
jvm_optimization:
heap_sizing:
recommendation: "物理内存的50-75%"
minimum: "1GB"
maximum: "31GB (避免compressed OOPs问题)"
configuration: |
# jvm.options
-Xms8g
-Xmx8g
# GC优化
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:+DisableExplicitGC
# 监控和调试
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/var/log/logstash
garbage_collection:
g1gc_tuning: |
# G1GC专用设置
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:G1HeapRegionSize=16m
-XX:+G1UseAdaptiveIHOP
-XX:G1MixedGCCountTarget=8
-XX:G1OldCSetRegionThreshold=10
monitoring_gc: |
# GC日志
-XX:+PrintGC
-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps
-Xloggc:/var/log/logstash/gc.log
-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=10
-XX:GCLogFileSize=10M
pipeline_optimization:
worker_configuration:
calculation: "pipeline.workers = CPU cores"
considerations:
- "I/O密集型可以增加"
- "CPU密集型保持核心数"
- "内存限制会影响"
- "下游系统性能"
tuning_examples: |
# logstash.yml
pipeline.workers: 8
pipeline.batch.size: 125
pipeline.batch.delay: 50
# 针对不同场景
# 高吞吐量场景
pipeline.batch.size: 1000
pipeline.batch.delay: 5
# 低延迟场景
pipeline.batch.size: 50
pipeline.batch.delay: 1
memory_management:
queue_configuration: |
# 内存队列(默认)
queue.type: memory
queue.page_capacity: 64mb
queue.max_events: 0
# 持久化队列
queue.type: persisted
queue.max_bytes: 1024mb
queue.page_capacity: 64mb
queue.checkpoint.acks: 1024
queue.checkpoint.writes: 1024
memory_monitoring:
- "堆内存使用率"
- "GC频率和耗时"
- "队列大小"
- "事件处理速率"yaml
plugin_optimization:
grok_optimization:
pattern_efficiency:
best_practices:
- "使用最具体的模式"
- "避免.*贪婪匹配"
- "预编译常用模式"
- "设置超时时间"
performance_patterns: |
# 高效模式
%{TIMESTAMP_ISO8601:timestamp} \[%{LOGLEVEL:level}\] %{DATA:message}
# 避免低效模式
%{DATA}.*%{GREEDYDATA} # 多个贪婪匹配
.*some_pattern.* # 前后都是贪婪匹配
caching_strategy: |
grok {
patterns_dir => ["/etc/logstash/patterns"]
keep_empty_captures => false
named_captures_only => true
timeout_millis => 30000
}
filter_optimization:
conditional_processing:
efficient_conditions: |
# 高效条件判断
if [log_level] == "ERROR" {
# 处理逻辑
} else if [log_level] in ["WARN", "INFO"] {
# 其他处理
}
# 避免复杂正则
if [message] =~ /^ERROR/ { # 简单前缀匹配
# 比复杂正则更高效
}
filter_ordering:
principles:
- "便宜的过滤器在前"
- "高概率匹配的在前"
- "数据量大的处理在后"
- "失败率高的单独处理"
field_management:
memory_efficiency: |
filter {
# 及早删除不需要的字段
mutate {
remove_field => ["temp", "debug", "raw_message"]
}
# 条件字段处理
if ![important_field] {
drop { } # 丢弃无用事件
}
}
output_optimization:
elasticsearch_tuning: |
elasticsearch {
# 批量优化
flush_size => 500
idle_flush_time => 1
# 并发优化
workers => 2
# 连接池优化
pool_max => 1000
pool_max_per_route => 100
# 模板管理
manage_template => false # 手动管理模板
# 重试策略
retry_max_interval => 64
}
batching_strategies:
size_based: "按事件数量批次"
time_based: "按时间间隔批次"
memory_based: "按内存使用批次"
adaptive: "自适应批次调整"监控和告警
监控指标和告警
yaml
monitoring_and_alerting:
key_metrics:
pipeline_metrics:
throughput:
- "events.in: 输入事件率"
- "events.out: 输出事件率"
- "events.filtered: 过滤事件率"
- "events.duration_in_millis: 处理延迟"
queue_metrics:
- "queue.events: 队列事件数"
- "queue.size_in_bytes: 队列大小"
- "queue.max_size_in_bytes: 最大队列"
- "queue.capacity.page_capacity_in_bytes: 页容量"
error_metrics:
- "events.failed: 失败事件数"
- "pipeline.reloads.failures: 重载失败"
- "dead_letter_queue.dropped_events: 丢弃事件"
jvm_metrics:
memory:
- "jvm.mem.heap_used_percent: 堆使用率"
- "jvm.mem.heap_committed_in_bytes: 堆提交内存"
- "jvm.mem.non_heap_used_in_bytes: 非堆使用"
garbage_collection:
- "jvm.gc.collectors.young.collection_time_in_millis: 年轻代GC时间"
- "jvm.gc.collectors.old.collection_time_in_millis: 老年代GC时间"
- "jvm.gc.collectors.young.collection_count: GC次数"
threads:
- "jvm.threads.count: 线程数"
- "jvm.threads.peak_count: 峰值线程数"
system_metrics:
process:
- "process.cpu.percent: CPU使用率"
- "process.mem.total_virtual_in_bytes: 虚拟内存"
- "process.open_file_descriptors: 打开文件数"
os:
- "os.cpu.load_average.1m: 1分钟负载"
- "os.mem.actual_used_in_bytes: 系统内存使用"
monitoring_setup:
api_monitoring: |
# 获取节点统计信息
curl -X GET "localhost:9600/_node/stats"
# 获取管道统计
curl -X GET "localhost:9600/_node/stats/pipelines"
# 获取JVM信息
curl -X GET "localhost:9600/_node/stats/jvm"
# 热线程分析
curl -X GET "localhost:9600/_node/hot_threads"
prometheus_integration: |
# 安装监控插件
bin/logstash-plugin install logstash-output-prometheus
# 配置监控输出
output {
prometheus {
increment => ["logstash_events_total"]
histogram => ["logstash_event_duration_seconds"]
gauge => ["logstash_queue_events"]
}
}
log_monitoring: |
# logstash.yml日志配置
log.level: info
log.format: json
path.logs: /var/log/logstash
# 日志文件轮转
logging.rotation.policy: time
logging.rotation.period: day
logging.max_files: 30
alerting_rules:
critical_alerts:
pipeline_stopped:
condition: "events.in == 0 for 5 minutes"
action: "重启管道或Logstash实例"
high_error_rate:
condition: "events.failed / events.in > 0.1"
action: "检查配置和输入数据"
memory_exhaustion:
condition: "jvm.mem.heap_used_percent > 90%"
action: "增加堆内存或优化配置"
queue_backlog:
condition: "queue.events > threshold for 10 minutes"
action: "检查下游系统性能"
warning_alerts:
performance_degradation:
condition: "events.duration_in_millis > baseline * 2"
investigation: "分析处理延迟原因"
gc_pressure:
condition: "gc_time / uptime > 0.1"
tuning: "优化GC参数"
high_cpu_usage:
condition: "process.cpu.percent > 80%"
scaling: "考虑水平扩展"
capacity_alerts:
disk_space_low:
condition: "disk_usage > 85%"
action: "清理日志或扩容"
file_descriptors_high:
condition: "open_file_descriptors > 80% of limit"
tuning: "调整ulimit或优化文件处理"
troubleshooting_guide:
common_issues:
performance_problems:
slow_processing:
diagnosis:
- "检查管道统计信息"
- "分析GC活动"
- "评估过滤器复杂度"
- "监控下游系统性能"
solutions:
- "增加pipeline workers"
- "优化Grok模式"
- "调整批次大小"
- "使用持久化队列"
memory_leaks:
symptoms:
- "堆内存持续增长"
- "频繁Full GC"
- "OutOfMemoryError"
investigation:
- "堆转储分析"
- "GC日志分析"
- "监控字段数据"
- "检查插件配置"
configuration_issues:
grok_failures:
debugging: |
# 启用详细调试
filter {
grok {
match => { "message" => "%{PATTERN}" }
tag_on_failure => ["_grokparsefailure"]
add_field => { "debug_message" => "%{message}" }
}
}
testing: |
# 使用grok调试器
echo "sample log line" | bin/logstash -f test.conf
output_failures:
elasticsearch_errors:
- "检查Elasticsearch集群状态"
- "验证索引模板"
- "监控网络连接"
- "检查认证配置"
dead_letter_queue: |
# 启用DLQ
dead_letter_queue.enable: true
dead_letter_queue.max_bytes: 1024mb
# 处理失败事件
input {
dead_letter_queue {
path => "/var/lib/logstash/dead_letter_queue"
start_timestamp => "2024-01-01T00:00:00"
}
}📋 Logstash面试重点
基础概念类
Logstash的工作原理和架构?
- Input-Filter-Output流水线模式
- 事件驱动处理机制
- 多管道并行处理
- 内存队列和持久化队列
Logstash与Beats的关系?
- Beats作为轻量级数据收集器
- Logstash作为数据处理引擎
- 配合使用的最佳实践
- 直接发送vs预处理的选择
Grok模式匹配的原理?
- 正则表达式封装
- 预定义模式库
- 自定义模式定义
- 性能优化技巧
配置实践类
如何设计高效的过滤器管道?
- 过滤器执行顺序
- 条件判断优化
- 字段操作最佳实践
- 错误处理策略
多管道架构如何设计?
- 按数据源分离
- 按处理复杂度分离
- 管道间通信
- 资源隔离策略
如何处理复杂的日志格式?
- 多行日志聚合
- 嵌套JSON解析
- 动态字段提取
- 数据类型转换
性能优化类
Logstash性能调优的关键点?
- JVM内存配置
- 管道工作线程调整
- 批次大小优化
- 插件性能优化
如何处理高吞吐量场景?
- 水平扩展策略
- 负载均衡配置
- 缓冲和队列优化
- 下游系统协调
内存使用如何优化?
- 堆内存配置
- 字段数据管理
- GC调优策略
- 内存泄漏诊断
🔗 相关内容
- ELK Stack概述 - 整体架构和技术栈
- Elasticsearch集群 - 数据存储和搜索
- Kibana可视化 - 数据可视化和分析
- 日志管理基础 - 日志管理架构设计
Logstash作为ELK技术栈的数据处理核心,通过灵活的插件架构和强大的处理能力,实现了复杂日志数据的标准化处理。掌握其配置和优化技巧是构建高效日志管理系统的关键。
