Logstash ist ein leistungsfähiges Datenverarbeitungs-Tool, das sich nahtlos mit OpenSearch integrieren lässt. In diesem Kapitel lernen Sie, wie Sie Logstash effektiv mit OpenSearch einsetzen können.
Zunächst benötigen Sie das OpenSearch Output Plugin für Logstash:
# Installation des OpenSearch Output Plugins
bin/logstash-plugin install logstash-output-opensearch
# config/jvm.options
-Xms2g
-Xmx2g
-XX:+UseG1GC
-XX:G1ReservePercent=25
-XX:InitiatingHeapOccupancyPercent=75
input {
file {
path => "/var/log/application/*.log"
start_position => "beginning"
sincedb_path => "/var/lib/logstash/sincedb"
type => "application-logs"
}
}
filter {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \[%{DATA:service}\] %{DATA:trace_id} - %{GREEDYDATA:log_message}" }
}
date {
match => [ "timestamp", "ISO8601" ]
target => "@timestamp"
}
mutate {
add_field => {
"environment" => "production"
"datacenter" => "eu-central"
}
}
}
output {
opensearch {
hosts => ["https://opensearch:9200"]
user => "admin"
password => "admin"
ssl => true
ssl_certificate_verification => false # Nur für Entwicklung
index => "logs-%{+YYYY.MM.dd}"
# OpenSearch spezifische Einstellungen
template_name => "logs"
template_overwrite => true
# Bulk-Einstellungen
bulk_size => 1000
bulk_timeout => "5s"
}
}input {
beats {
port => 5044
ssl => true
ssl_certificate => "/etc/logstash/ssl/logstash.crt"
ssl_key => "/etc/logstash/ssl/logstash.key"
ssl_verify_mode => "force_peer"
}
}input {
kafka {
bootstrap_servers => "kafka1:9092,kafka2:9092"
topics => ["logs", "metrics"]
group_id => "logstash_consumers"
client_id => "logstash_client"
consumer_threads => 4
codec => json
decorate_events => true
max_poll_records => "5000"
max_poll_interval_ms => "300000"
}
}input {
http {
port => 8080
codec => json
ssl => true
ssl_certificate => "/etc/logstash/ssl/logstash.crt"
ssl_key => "/etc/logstash/ssl/logstash.key"
additional_codecs => {
"application/log" => "plain"
}
}
}filter {
if [type] == "application-logs" {
grok {
patterns_dir => ["/etc/logstash/patterns"]
match => { "message" => "%{CUSTOM_LOG_FORMAT}" }
tag_on_failure => ["_grokparsefailure"]
}
mutate {
convert => {
"response_time" => "float"
"bytes" => "integer"
}
}
if [status] =~ /^[45]/ {
mutate {
add_field => { "error_type" => "http_error" }
}
}
}
ruby {
code => '
event.set("processing_time",
Time.now.to_f - event.get("@timestamp").to_f
)
'
}
translate {
field => "user_id"
destination => "user_group"
dictionary_path => "/etc/logstash/dictionaries/users.yml"
refresh_interval => 300
}
}filter {
aggregate {
task_id => "%{id}"
code => "
map['requests'] ||= 0
map['requests'] += 1
map['total_time'] ||= 0
map['total_time'] += event.get('response_time')
event.cancel if map['requests'] < 10
event.set('avg_response_time', map['total_time'] / map['requests'])
"
timeout => 120
push_previous_map_as_event => true
}
metrics {
meter => ["requests"]
gauge => ["response_time", "processing_time"]
timer => ["request_duration", "%{response_time}"]
add_tag => "metrics"
}
}output {
opensearch {
hosts => ["https://opensearch:9200"]
user => "${OPENSEARCH_USER}"
password => "${OPENSEARCH_PASSWORD}"
# Index-Einstellungen
index => "logs-%{+YYYY.MM}"
manage_template => true
template => "/etc/logstash/templates/logs-template.json"
template_name => "logs"
template_overwrite => true
# Bulk-Einstellungen
bulk_size => 5000
bulk_timeout => "3s"
# Retry-Einstellungen
retry_initial_interval => "2s"
retry_max_interval => "64s"
retry_on_conflict => 5
# Buffer-Einstellungen
pool_max => 4
pool_max_per_route => 2
# Timeout-Einstellungen
timeout => 120
# SSL/TLS-Einstellungen
ssl => true
ssl_certificate_verification => true
cacert => "/etc/logstash/ssl/ca.crt"
}
}# config/logstash.yml
dead_letter_queue.enable: true
path.dead_letter_queue: "/var/lib/logstash/dlq"
# Pipeline-Konfiguration
output {
opensearch {
# ... OpenSearch-Konfiguration ...
dead_letter_queue.enable: true
dead_letter_queue.max_bytes: 1gb
action => "index"
failure_type => ["all"]
}
}# config/logstash.yml
monitoring.enabled: true
monitoring.opensearch.hosts: ["https://monitoring-opensearch:9200"]
monitoring.opensearch.username: "logstash_system"
monitoring.opensearch.password: "${MONITORING_PASSWORD}"# config/logstash.yml
pipeline.workers: 4
pipeline.batch.size: 125
pipeline.batch.delay: 50
queue.type: persisted
queue.max_bytes: 1gb
# Pipeline-spezifische Einstellungen
input {
beats {
port => 5044
client_inactivity_timeout => 60
receive_buffer_bytes => 33554432
}
}
filter {
ruby {
code => "..."
path => "/etc/logstash/scripts/custom_processing.rb"
script_params => { "cache_size" => 10000 }
}
}output {
opensearch {
hosts => ["https://opensearch:9200"]
ssl => true
cacert => "/etc/logstash/ssl/ca.crt"
certificate => "/etc/logstash/ssl/logstash.crt"
key => "/etc/logstash/ssl/logstash.key"
ssl_certificate_verification => true
}
}output {
opensearch {
hosts => ["https://opensearch:9200"]
api_key => "${API_KEY}"
ssl => true
ssl_certificate_verification => true
}
}input {
beats {
port => 5044
ssl => true
ssl_certificate => "/etc/logstash/ssl/logstash.crt"
ssl_key => "/etc/logstash/ssl/logstash.key"
}
kafka {
bootstrap_servers => "kafka:9092"
topics => ["application-logs"]
codec => json
}
}
filter {
if [type] == "nginx-access" {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
geoip {
source => "clientip"
target => "geoip"
}
useragent {
source => "agent"
target => "user_agent"
}
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
}
mutate {
remove_field => ["message", "agent"]
add_field => {
"environment" => "${ENV}"
"application" => "web-frontend"
}
}
}
output {
opensearch {
hosts => ["https://opensearch:9200"]
user => "${OPENSEARCH_USER}"
password => "${OPENSEARCH_PASSWORD}"
index => "logs-%{+YYYY.MM.dd}"
template => "/etc/logstash/templates/logs-template.json"
template_name => "logs"
template_overwrite => true
bulk_size => 5000
ssl => true
ssl_certificate_verification => true
cacert => "/etc/logstash/ssl/ca.crt"
}
}input {
beats {
port => 5045
type => "metrics"
}
}
filter {
if [type] == "metrics" {
ruby {
code => '
event.to_hash.each do |k,v|
if v.is_a?(Numeric)
event.set("[normalized]#{k}", v * 100)
end
end
'
}
aggregate {
task_id => "%{host}"
code => "
map['metrics_count'] ||= 0
map['metrics_count'] += 1
event.cancel if map['metrics_count'] < 10
"
timeout => 120
}
}
}
output {
opensearch {
hosts => ["https://opensearch:9200"]
index => "metrics-%{+YYYY.MM.dd}"
template_name => "metrics"
}
}Diese Best Practices helfen Ihnen, eine robuste und effiziente Logstash-Integration mit OpenSearch zu implementieren.