61 Logstash Integration mit OpenSearch

Logstash ist ein leistungsfähiges Datenverarbeitungs-Tool, das sich nahtlos mit OpenSearch integrieren lässt. In diesem Kapitel lernen Sie, wie Sie Logstash effektiv mit OpenSearch einsetzen können.

61.1 Grundlegende Konfiguration

61.1.1 Installation und Basis-Setup

Zunächst benötigen Sie das OpenSearch Output Plugin für Logstash:

# Installation des OpenSearch Output Plugins
bin/logstash-plugin install logstash-output-opensearch

61.1.2 JVM-Einstellungen optimieren

# config/jvm.options
-Xms2g
-Xmx2g
-XX:+UseG1GC
-XX:G1ReservePercent=25
-XX:InitiatingHeapOccupancyPercent=75

61.1.3 Basis-Pipeline für OpenSearch

input {
  file {
    path => "/var/log/application/*.log"
    start_position => "beginning"
    sincedb_path => "/var/lib/logstash/sincedb"
    type => "application-logs"
  }
}

filter {
  grok {
    match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \[%{DATA:service}\] %{DATA:trace_id} - %{GREEDYDATA:log_message}" }
  }
  
  date {
    match => [ "timestamp", "ISO8601" ]
    target => "@timestamp"
  }
  
  mutate {
    add_field => {
      "environment" => "production"
      "datacenter" => "eu-central"
    }
  }
}

output {
  opensearch {
    hosts => ["https://opensearch:9200"]
    user => "admin"
    password => "admin"
    ssl => true
    ssl_certificate_verification => false  # Nur für Entwicklung
    index => "logs-%{+YYYY.MM.dd}"
    
    # OpenSearch spezifische Einstellungen
    template_name => "logs"
    template_overwrite => true
    
    # Bulk-Einstellungen
    bulk_size => 1000
    bulk_timeout => "5s"
  }
}

61.2 Erweiterte Input-Konfigurationen

61.2.1 Beats Input

input {
  beats {
    port => 5044
    ssl => true
    ssl_certificate => "/etc/logstash/ssl/logstash.crt"
    ssl_key => "/etc/logstash/ssl/logstash.key"
    ssl_verify_mode => "force_peer"
  }
}

61.2.2 Kafka Input

input {
  kafka {
    bootstrap_servers => "kafka1:9092,kafka2:9092"
    topics => ["logs", "metrics"]
    group_id => "logstash_consumers"
    client_id => "logstash_client"
    consumer_threads => 4
    codec => json
    decorate_events => true
    max_poll_records => "5000"
    max_poll_interval_ms => "300000"
  }
}

61.2.3 HTTP Input für API-Logs

input {
  http {
    port => 8080
    codec => json
    ssl => true
    ssl_certificate => "/etc/logstash/ssl/logstash.crt"
    ssl_key => "/etc/logstash/ssl/logstash.key"
    additional_codecs => {
      "application/log" => "plain"
    }
  }
}

61.3 Komplexe Filter-Konfigurationen

61.3.1 Log-Parsing und Anreicherung

filter {
  if [type] == "application-logs" {
    grok {
      patterns_dir => ["/etc/logstash/patterns"]
      match => { "message" => "%{CUSTOM_LOG_FORMAT}" }
      tag_on_failure => ["_grokparsefailure"]
    }
    
    mutate {
      convert => {
        "response_time" => "float"
        "bytes" => "integer"
      }
    }
    
    if [status] =~ /^[45]/ {
      mutate {
        add_field => { "error_type" => "http_error" }
      }
    }
  }
  
  ruby {
    code => '
      event.set("processing_time", 
        Time.now.to_f - event.get("@timestamp").to_f
      )
    '
  }
  
  translate {
    field => "user_id"
    destination => "user_group"
    dictionary_path => "/etc/logstash/dictionaries/users.yml"
    refresh_interval => 300
  }
}

61.3.2 Metriken-Aggregation

filter {
  aggregate {
    task_id => "%{id}"
    code => "
      map['requests'] ||= 0
      map['requests'] += 1
      map['total_time'] ||= 0
      map['total_time'] += event.get('response_time')
      
      event.cancel if map['requests'] < 10
      
      event.set('avg_response_time', map['total_time'] / map['requests'])
    "
    timeout => 120
    push_previous_map_as_event => true
  }
  
  metrics {
    meter => ["requests"]
    gauge => ["response_time", "processing_time"]
    timer => ["request_duration", "%{response_time}"]
    add_tag => "metrics"
  }
}

61.4 OpenSearch Output Optimierung

61.4.1 Bulk-Indexierung und Fehlerbehandlung

output {
  opensearch {
    hosts => ["https://opensearch:9200"]
    user => "${OPENSEARCH_USER}"
    password => "${OPENSEARCH_PASSWORD}"
    
    # Index-Einstellungen
    index => "logs-%{+YYYY.MM}"
    manage_template => true
    template => "/etc/logstash/templates/logs-template.json"
    template_name => "logs"
    template_overwrite => true
    
    # Bulk-Einstellungen
    bulk_size => 5000
    bulk_timeout => "3s"
    
    # Retry-Einstellungen
    retry_initial_interval => "2s"
    retry_max_interval => "64s"
    retry_on_conflict => 5
    
    # Buffer-Einstellungen
    pool_max => 4
    pool_max_per_route => 2
    
    # Timeout-Einstellungen
    timeout => 120
    
    # SSL/TLS-Einstellungen
    ssl => true
    ssl_certificate_verification => true
    cacert => "/etc/logstash/ssl/ca.crt"
  }
}

61.4.2 Dead Letter Queue (DLQ) Konfiguration

# config/logstash.yml
dead_letter_queue.enable: true
path.dead_letter_queue: "/var/lib/logstash/dlq"

# Pipeline-Konfiguration
output {
  opensearch {
    # ... OpenSearch-Konfiguration ...
    
    dead_letter_queue.enable: true
    dead_letter_queue.max_bytes: 1gb
    
    action => "index"
    failure_type => ["all"]
  }
}

61.5 Monitoring und Performance

61.5.1 Pipeline-Monitoring aktivieren

# config/logstash.yml
monitoring.enabled: true
monitoring.opensearch.hosts: ["https://monitoring-opensearch:9200"]
monitoring.opensearch.username: "logstash_system"
monitoring.opensearch.password: "${MONITORING_PASSWORD}"

61.5.2 Performance-Tuning

# config/logstash.yml
pipeline.workers: 4
pipeline.batch.size: 125
pipeline.batch.delay: 50
queue.type: persisted
queue.max_bytes: 1gb

# Pipeline-spezifische Einstellungen
input {
  beats {
    port => 5044
    client_inactivity_timeout => 60
    receive_buffer_bytes => 33554432
  }
}

filter {
  ruby {
    code => "..."
    path => "/etc/logstash/scripts/custom_processing.rb"
    script_params => { "cache_size" => 10000 }
  }
}

61.6 Sicherheit und SSL/TLS

61.6.1 Zertifikatsbasierte Authentifizierung

output {
  opensearch {
    hosts => ["https://opensearch:9200"]
    ssl => true
    cacert => "/etc/logstash/ssl/ca.crt"
    certificate => "/etc/logstash/ssl/logstash.crt"
    key => "/etc/logstash/ssl/logstash.key"
    ssl_certificate_verification => true
  }
}

61.6.2 API-Schlüssel-Authentifizierung

output {
  opensearch {
    hosts => ["https://opensearch:9200"]
    api_key => "${API_KEY}"
    ssl => true
    ssl_certificate_verification => true
  }
}

61.7 Praktische Beispiele

61.7.1 Log-Aggregation Pipeline

input {
  beats {
    port => 5044
    ssl => true
    ssl_certificate => "/etc/logstash/ssl/logstash.crt"
    ssl_key => "/etc/logstash/ssl/logstash.key"
  }
  
  kafka {
    bootstrap_servers => "kafka:9092"
    topics => ["application-logs"]
    codec => json
  }
}

filter {
  if [type] == "nginx-access" {
    grok {
      match => { "message" => "%{COMBINEDAPACHELOG}" }
    }
    
    geoip {
      source => "clientip"
      target => "geoip"
    }
    
    useragent {
      source => "agent"
      target => "user_agent"
    }
  }
  
  date {
    match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
    target => "@timestamp"
  }
  
  mutate {
    remove_field => ["message", "agent"]
    add_field => {
      "environment" => "${ENV}"
      "application" => "web-frontend"
    }
  }
}

output {
  opensearch {
    hosts => ["https://opensearch:9200"]
    user => "${OPENSEARCH_USER}"
    password => "${OPENSEARCH_PASSWORD}"
    index => "logs-%{+YYYY.MM.dd}"
    
    template => "/etc/logstash/templates/logs-template.json"
    template_name => "logs"
    template_overwrite => true
    
    bulk_size => 5000
    
    ssl => true
    ssl_certificate_verification => true
    cacert => "/etc/logstash/ssl/ca.crt"
  }
}

61.7.2 Metriken-Verarbeitung Pipeline

input {
  beats {
    port => 5045
    type => "metrics"
  }
}

filter {
  if [type] == "metrics" {
    ruby {
      code => '
        event.to_hash.each do |k,v|
          if v.is_a?(Numeric)
            event.set("[normalized]#{k}", v * 100)
          end
        end
      '
    }
    
    aggregate {
      task_id => "%{host}"
      code => "
        map['metrics_count'] ||= 0
        map['metrics_count'] += 1
        
        event.cancel if map['metrics_count'] < 10
      "
      timeout => 120
    }
  }
}

output {
  opensearch {
    hosts => ["https://opensearch:9200"]
    index => "metrics-%{+YYYY.MM.dd}"
    template_name => "metrics"
  }
}

61.8 Best Practices

  1. Pipeline-Design
  2. Performance-Optimierung
  3. Monitoring und Wartung
  4. Sicherheit

Diese Best Practices helfen Ihnen, eine robuste und effiziente Logstash-Integration mit OpenSearch zu implementieren.