Data Prepper ist ein serverseitiges Data Collector und Processor Framework, das speziell für die Verarbeitung und Transformation von Daten vor der Indexierung in OpenSearch entwickelt wurde. In diesem Kapitel lernen Sie, wie Sie Data Prepper effektiv für Ihre Datenpipelines einsetzen können.
Data Prepper basiert auf drei Hauptkomponenten: - Source: Woher die Daten kommen - Buffer: Wie die Daten zwischengespeichert werden - Sink: Wohin die Daten gehen
Diese Komponenten werden durch Processor-Pipelines verbunden, die die Daten transformieren und anreichern.
# docker-compose.yml
version: '3'
services:
data-prepper:
image: opensearchproject/data-prepper:latest
volumes:
- ./pipelines:/usr/share/data-prepper/pipelines
- ./config:/usr/share/data-prepper/config
ports:
- "4900:4900" # Für Dateneingabe
- "4903:4903" # Für Metriken
environment:
- "JAVA_OPTS=-Xmx512m -Xms512m"# config.yml
ssl: false
plugins:
http:
port: 4900
metrics:
port: 4903
log4j:
appender:
console:
type: Console
target: SYSTEM_OUT
layout:
type: PatternLayout
pattern: "%d{ISO8601} [%t] %-5p %c{1} - %m%n"Eine typische Pipeline besteht aus mehreren Komponenten. Hier ein Beispiel für eine Log-Verarbeitungs-Pipeline:
# log-pipeline.yaml
pipeline:
source:
http:
port: 4900
buffer:
bounded_blocking:
buffer_size: 1024
batch_size: 256
processor:
- grok:
match:
message: "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:log_level} %{GREEDYDATA:message}"
- date:
from_time_received: false
source: timestamp
destination: "@timestamp"
formats:
- "yyyy-MM-dd'T'HH:mm:ss.SSSZ"
- add_fields:
fields:
environment: "production"
service: "user-service"
sink:
- opensearch:
hosts: ["https://opensearch:9200"]
username: "admin"
password: "admin"
index: "logs-%{yyyy.MM.dd}"processor:
- grok:
match:
log: "%{IP:client_ip} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:status} %{NUMBER:bytes} %{NUMBER:duration}"
pattern_definitions:
DURATION: "[0-9]+[.][0-9]+"
tag_on_failure: ["_grokparsefailure"]processor:
- aggregate:
identification_keys: ["transaction_id"]
action: combine
group_duration: "60s"
fields:
- name: "total_amount"
type: "sum"
source: "amount"processor:
- mutate:
add_field:
processed_timestamp: "${new Date().format('yyyy-MM-dd HH:mm:ss.SSS')}"
remove_field: ["temporary_field"]
rename:
old_field_name: "new_field_name"
uppercase: ["status"]
convert:
response_time: "float"Diese Pipeline sammelt Logs von verschiedenen Quellen und bereitet sie für die Analyse vor:
# log-aggregation-pipeline.yaml
pipeline:
source:
http:
port: 4900
buffer:
bounded_blocking:
buffer_size: 2048
batch_size: 256
processor:
- grok:
match:
message: "%{TIMESTAMP_ISO8601:timestamp} \\[%{DATA:service}\\] %{LOGLEVEL:level} %{DATA:trace_id} - %{GREEDYDATA:log_message}"
- date:
source: "timestamp"
destination: "@timestamp"
formats: ["yyyy-MM-dd'T'HH:mm:ss.SSSZ"]
- mutate:
add_field:
environment: "${ENV_TYPE}"
datacenter: "${DC_LOCATION}"
- drop:
if: "ctx.level.toLowerCase() == 'debug' && ENV_TYPE == 'production'"
sink:
- opensearch:
hosts: ["https://opensearch:9200"]
index: "logs-%{service}-%{+yyyy.MM.dd}"
username: "${ES_USERNAME}"
password: "${ES_PASSWORD}"Eine Pipeline zur Verarbeitung von System-Metriken:
# metrics-pipeline.yaml
pipeline:
source:
otel_metrics_source:
port: 4317
buffer:
bounded_blocking:
buffer_size: 512
batch_size: 64
processor:
- aggregate:
identification_keys: ["host", "service"]
action: mean
group_duration: "1m"
fields:
- name: "cpu_usage"
type: "double"
source: "system.cpu.usage"
- name: "memory_usage"
type: "double"
source: "system.memory.used_percent"
- add_fields:
fields:
datacenter: "${DC_LOCATION}"
environment: "${ENV_TYPE}"
sink:
- opensearch:
hosts: ["https://opensearch:9200"]
index: "metrics-%{+yyyy.MM}"
template_file: "metrics-template.json"Eine Pipeline zur Verarbeitung von Traces für Distributed Tracing:
# traces-pipeline.yaml
pipeline:
source:
otel_traces_source:
port: 4317
buffer:
bounded_blocking:
buffer_size: 1024
batch_size: 128
processor:
- service_map_stateful:
window_duration: "3m"
- trace_peer_forwarder:
discovery_mode: "dns"
domain_name: "data-prepper"
sink:
- opensearch:
hosts: ["https://opensearch:9200"]
trace_analytics_raw: true
trace_analytics_service_map: true
ssl: true
insecure: falseData Prepper exponiert Metriken über einen Prometheus-kompatiblen Endpunkt:
# monitoring-config.yaml
monitoring:
metrics:
prometheus:
port: 4903
host: "0.0.0.0"Prometheus-Konfiguration:
# prometheus.yml
scrape_configs:
- job_name: 'data_prepper'
static_configs:
- targets: ['data-prepper:4903']Beispiel für ein Grafana Alert:
# grafana-alert.yaml
groups:
- name: DataPrepperAlerts
rules:
- alert: HighProcessingLatency
expr: rate(data_prepper_pipeline_processing_time_milliseconds_sum[5m]) / rate(data_prepper_pipeline_processing_time_milliseconds_count[5m]) > 1000
for: 5m
labels:
severity: warning
annotations:
summary: High processing latency detected
description: Pipeline processing latency is above 1 secondHäufige Probleme und ihre Lösungen: