60 Data Prepper in OpenSearch

Data Prepper ist ein serverseitiges Data Collector und Processor Framework, das speziell für die Verarbeitung und Transformation von Daten vor der Indexierung in OpenSearch entwickelt wurde. In diesem Kapitel lernen Sie, wie Sie Data Prepper effektiv für Ihre Datenpipelines einsetzen können.

60.1 Grundlegende Konzepte

Data Prepper basiert auf drei Hauptkomponenten: - Source: Woher die Daten kommen - Buffer: Wie die Daten zwischengespeichert werden - Sink: Wohin die Daten gehen

Diese Komponenten werden durch Processor-Pipelines verbunden, die die Daten transformieren und anreichern.

60.2 Installation und Konfiguration

60.2.1 Voraussetzungen

60.2.2 Installation über Docker

# docker-compose.yml
version: '3'
services:
  data-prepper:
    image: opensearchproject/data-prepper:latest
    volumes:
      - ./pipelines:/usr/share/data-prepper/pipelines
      - ./config:/usr/share/data-prepper/config
    ports:
      - "4900:4900"   # Für Dateneingabe
      - "4903:4903"   # Für Metriken
    environment:
      - "JAVA_OPTS=-Xmx512m -Xms512m"

60.2.3 Basis-Konfiguration

# config.yml
ssl: false
plugins:
  http:
    port: 4900
  metrics:
    port: 4903

log4j:
  appender:
    console:
      type: Console
      target: SYSTEM_OUT
      layout:
        type: PatternLayout
        pattern: "%d{ISO8601} [%t] %-5p %c{1} - %m%n"

60.3 Pipeline-Konfiguration

Eine typische Pipeline besteht aus mehreren Komponenten. Hier ein Beispiel für eine Log-Verarbeitungs-Pipeline:

# log-pipeline.yaml
pipeline:
  source:
    http:
      port: 4900
  
  buffer:
    bounded_blocking:
      buffer_size: 1024
      batch_size: 256
  
  processor:
    - grok:
        match:
          message: "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:log_level} %{GREEDYDATA:message}"
    
    - date:
        from_time_received: false
        source: timestamp
        destination: "@timestamp"
        formats:
          - "yyyy-MM-dd'T'HH:mm:ss.SSSZ"
    
    - add_fields:
        fields:
          environment: "production"
          service: "user-service"
    
  sink:
    - opensearch:
        hosts: ["https://opensearch:9200"]
        username: "admin"
        password: "admin"
        index: "logs-%{yyyy.MM.dd}"

60.4 Prozessor-Typen und ihre Anwendung

60.4.1 Grok-Prozessor für Log-Parsing

processor:
  - grok:
      match:
        log: "%{IP:client_ip} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:status} %{NUMBER:bytes} %{NUMBER:duration}"
      pattern_definitions:
        DURATION: "[0-9]+[.][0-9]+"
      tag_on_failure: ["_grokparsefailure"]

60.4.2 Aggregations-Prozessor

processor:
  - aggregate:
      identification_keys: ["transaction_id"]
      action: combine
      group_duration: "60s"
      fields:
        - name: "total_amount"
          type: "sum"
          source: "amount"

60.4.3 Mutate-Prozessor für Feldmanipulation

processor:
  - mutate:
      add_field:
        processed_timestamp: "${new Date().format('yyyy-MM-dd HH:mm:ss.SSS')}"
      remove_field: ["temporary_field"]
      rename:
        old_field_name: "new_field_name"
      uppercase: ["status"]
      convert:
        response_time: "float"

60.5 Praktische Anwendungsbeispiele

60.5.1 Log-Aggregation Pipeline

Diese Pipeline sammelt Logs von verschiedenen Quellen und bereitet sie für die Analyse vor:

# log-aggregation-pipeline.yaml
pipeline:
  source:
    http:
      port: 4900
      
  buffer:
    bounded_blocking:
      buffer_size: 2048
      batch_size: 256
      
  processor:
    - grok:
        match:
          message: "%{TIMESTAMP_ISO8601:timestamp} \\[%{DATA:service}\\] %{LOGLEVEL:level} %{DATA:trace_id} - %{GREEDYDATA:log_message}"
    
    - date:
        source: "timestamp"
        destination: "@timestamp"
        formats: ["yyyy-MM-dd'T'HH:mm:ss.SSSZ"]
    
    - mutate:
        add_field:
          environment: "${ENV_TYPE}"
          datacenter: "${DC_LOCATION}"
    
    - drop:
        if: "ctx.level.toLowerCase() == 'debug' && ENV_TYPE == 'production'"
        
  sink:
    - opensearch:
        hosts: ["https://opensearch:9200"]
        index: "logs-%{service}-%{+yyyy.MM.dd}"
        username: "${ES_USERNAME}"
        password: "${ES_PASSWORD}"

60.5.2 Metriken-Verarbeitung

Eine Pipeline zur Verarbeitung von System-Metriken:

# metrics-pipeline.yaml
pipeline:
  source:
    otel_metrics_source:
      port: 4317
      
  buffer:
    bounded_blocking:
      buffer_size: 512
      batch_size: 64
      
  processor:
    - aggregate:
        identification_keys: ["host", "service"]
        action: mean
        group_duration: "1m"
        fields:
          - name: "cpu_usage"
            type: "double"
            source: "system.cpu.usage"
          - name: "memory_usage"
            type: "double"
            source: "system.memory.used_percent"
            
    - add_fields:
        fields:
          datacenter: "${DC_LOCATION}"
          environment: "${ENV_TYPE}"
          
  sink:
    - opensearch:
        hosts: ["https://opensearch:9200"]
        index: "metrics-%{+yyyy.MM}"
        template_file: "metrics-template.json"

60.5.3 Traces-Verarbeitung

Eine Pipeline zur Verarbeitung von Traces für Distributed Tracing:

# traces-pipeline.yaml
pipeline:
  source:
    otel_traces_source:
      port: 4317
      
  buffer:
    bounded_blocking:
      buffer_size: 1024
      batch_size: 128
      
  processor:
    - service_map_stateful:
        window_duration: "3m"
        
    - trace_peer_forwarder:
        discovery_mode: "dns"
        domain_name: "data-prepper"
        
  sink:
    - opensearch:
        hosts: ["https://opensearch:9200"]
        trace_analytics_raw: true
        trace_analytics_service_map: true
        ssl: true
        insecure: false

60.6 Monitoring und Wartung

60.6.1 Metriken-Endpunkt

Data Prepper exponiert Metriken über einen Prometheus-kompatiblen Endpunkt:

# monitoring-config.yaml
monitoring:
  metrics:
    prometheus:
      port: 4903
      host: "0.0.0.0"

Prometheus-Konfiguration:

# prometheus.yml
scrape_configs:
  - job_name: 'data_prepper'
    static_configs:
      - targets: ['data-prepper:4903']

60.6.2 Alerting einrichten

Beispiel für ein Grafana Alert:

# grafana-alert.yaml
groups:
  - name: DataPrepperAlerts
    rules:
      - alert: HighProcessingLatency
        expr: rate(data_prepper_pipeline_processing_time_milliseconds_sum[5m]) / rate(data_prepper_pipeline_processing_time_milliseconds_count[5m]) > 1000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: High processing latency detected
          description: Pipeline processing latency is above 1 second

60.7 Best Practices

  1. Pipeline-Design
  2. Performance-Optimierung
  3. Sicherheit

60.8 Troubleshooting

Häufige Probleme und ihre Lösungen:

  1. Hohe Latenz
  2. Datenverlust
  3. Speicherlecks

60.9 Praktische Übungen

  1. Log-Pipeline implementieren Erstellen Sie eine Pipeline, die:
  2. Metriken-Verarbeitung Entwickeln Sie eine Pipeline für:
  3. Distributed Tracing Implementieren Sie Tracing mit: