20 Ingest-APIs

20.1 Grundlegendes Verständnis

Stellen Sie sich Ingest-Pipelines als Fließband in einer Fabrik vor: Bevor die Rohmaterialien (Ihre Daten) ihren endgültigen Platz (den Index) erreichen, durchlaufen sie verschiedene Bearbeitungsstationen (Prozessoren), die jeweils spezifische Transformationen vornehmen.

20.1.1 Warum Ingest-Pipelines?

Ingest-Pipelines lösen häufige Herausforderungen bei der Datenverarbeitung:

20.2 Aufbau einer Pipeline

20.2.1 Grundstruktur

Eine Pipeline besteht aus zwei Hauptkomponenten:

PUT /_ingest/pipeline/meine-pipeline
{
  "description": "Eine aussagekräftige Beschreibung der Pipeline",
  "processors": [
    {
      "prozessor-name": {
        "field": "feldname",
        "weitere": "parameter"
      }
    }
  ]
}

20.2.2 Die wichtigsten Prozessoren

  1. Set-Prozessor: Setzt oder ändert Feldwerte
{
  "set": {
    "field": "kategorie",
    "value": "standard"
  }
}
  1. Convert-Prozessor: Konvertiert Datentypen
{
  "convert": {
    "field": "preis",
    "type": "float"
  }
}
  1. Date-Prozessor: Verarbeitet Datums-Strings
{
  "date": {
    "field": "erstellungsdatum",
    "formats": ["dd.MM.yyyy", "yyyy-MM-dd"],
    "target_field": "erstellungsdatum_iso"
  }
}

20.3 Fortgeschrittene Konzepte

20.3.1 Bedingte Ausführung

Prozessoren können basierend auf Bedingungen ausgeführt werden:

{
  "set": {
    "if": "ctx.preis > 100",
    "field": "preiskategorie",
    "value": "premium"
  }
}

20.3.2 Template-Unterstützung

Dynamische Werte durch Templates:

{
  "set": {
    "field": "vollerName",
    "value": "{{vorname}} {{nachname}}"
  }
}

20.4 Praktische Anwendungsfälle

20.4.1 Log-Verarbeitung

PUT /_ingest/pipeline/log-pipeline
{
  "description": "Bereitet Log-Einträge auf",
  "processors": [
    {
      "grok": {
        "field": "message",
        "patterns": ["%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}"]
      }
    },
    {
      "date": {
        "field": "timestamp",
        "formats": ["ISO8601"]
      }
    },
    {
      "set": {
        "field": "processed_timestamp",
        "value": "{{_ingest.timestamp}}"
      }
    }
  ]
}

20.4.2 Datennormalisierung für E-Commerce

PUT /_ingest/pipeline/produkt-pipeline
{
  "description": "Normalisiert Produktdaten",
  "processors": [
    {
      "uppercase": {
        "field": "produktcode"
      }
    },
    {
      "convert": {
        "field": "preis",
        "type": "double"
      }
    },
    {
      "script": {
        "source": "ctx.preis_netto = ctx.preis / 1.19"
      }
    }
  ]
}

20.5 Pipeline-Verwaltung

20.5.1 Pipeline testen

Der Simulate-Endpoint ermöglicht das Testen von Pipelines:

POST /_ingest/pipeline/meine-pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "feldname": "testwert"
      }
    }
  ]
}

20.5.2 Pipeline-Performance

Pipeline-Performance optimieren:

  1. Prozessorreihenfolge optimieren:
{
  "processors": [
    {
      "script": {
        "description": "Leichte Operationen zuerst",
        "source": "ctx.einfache_berechnung = ctx.wert * 2"
      }
    },
    {
      "script": {
        "description": "Komplexe Operationen später",
        "source": "ctx.komplexe_berechnung = komplexe_funktion(ctx.wert)"
      }
    }
  ]
}

20.6 Fehlerbehandlung

20.6.1 On-Failure-Handler

Fehlerbehandlung für einzelne Prozessoren:

{
  "date": {
    "field": "timestamp",
    "formats": ["dd/MM/yyyy"],
    "on_failure": [
      {
        "set": {
          "field": "error_message",
          "value": "Ungültiges Datumsformat in {{_ingest._value}}"
        }
      }
    ]
  }
}

20.6.2 Pipeline-weite Fehlerbehandlung

PUT /_ingest/pipeline/robuste-pipeline
{
  "description": "Pipeline mit umfassender Fehlerbehandlung",
  "processors": [...],
  "on_failure": [
    {
      "set": {
        "field": "processing_error",
        "value": "Pipeline-Fehler: {{_ingest.on_failure_message}}"
      }
    }
  ]
}

20.7 Best Practices

20.7.1 Pipeline-Design

  1. Modulare Struktur:
// Basis-Pipeline
PUT /_ingest/pipeline/basis
{
  "processors": [
    {
      "set": {
        "field": "processed_date",
        "value": "{{_ingest.timestamp}}"
      }
    }
  ]
}

// Spezifische Pipeline
PUT /_ingest/pipeline/spezifisch
{
  "processors": [
    {
      "pipeline": {
        "name": "basis"
      }
    },
    {
      "set": {
        "field": "type",
        "value": "spezial"
      }
    }
  ]
}

20.7.2 Performance-Optimierung

  1. Caching nutzen:
{
  "script": {
    "lang": "painless",
    "id": "mein-cached-script",  // Vorcompiliertes Script
    "params": {
      "faktor": 1.19
    }
  }
}

20.8 Monitoring und Wartung

20.8.1 Pipeline-Statistiken

Pipeline-Statistiken überwachen:

GET /_ingest/pipeline/_stats

20.8.2 Pipeline-Dokumentation

Gute Pipeline-Dokumentation durch aussagekräftige Beschreibungen:

PUT /_ingest/pipeline/gut-dokumentiert
{
  "description": "Diese Pipeline verarbeitet Kundendaten: \n- Normalisiert Namen\n- Validiert E-Mails\n- Berechnet Kundenalter",
  "processors": [
    {
      "uppercase": {
        "description": "Konvertiert Namen in Großbuchstaben für einheitliche Suche",
        "field": "name"
      }
    }
  ]
}

20.9 Debugging und Troubleshooting

20.9.1 Verbose-Simulation

Detaillierte Simulation für Debugging:

POST /_ingest/pipeline/meine-pipeline/_simulate?verbose=true
{
  "docs": [
    {
      "_source": {
        "test": "wert"
      }
    }
  ]
}

20.9.2 Prozessor-Analyse

Einzelne Prozessoren analysieren:

GET /_ingest/processor

20.10 Best Practices

  1. Pipeline-Design:

  2. Performance:

  3. Wartbarkeit:

Ingest-Pipelines sind ein mächtiges Werkzeug zur Datentransformation. Mit sorgfältiger Planung und Implementierung können sie die Datenqualität erheblich verbessern und den Indexierungsprozess optimieren.